Revision created by MOE tool push_codebase.
[gae.git] / java / src / main / com / google / appengine / api / datastore / AsyncCloudDatastoreV1ServiceImpl.java
blob5c654c7af6c54b69902f7526df0700499cdbd328
1 package com.google.appengine.api.datastore;
3 import static com.google.appengine.api.datastore.DatastoreAttributes.DatastoreType.MASTER_SLAVE;
4 import static com.google.appengine.api.datastore.ReadPolicy.Consistency.EVENTUAL;
6 import com.google.appengine.api.datastore.Batcher.ReorderingMultiFuture;
7 import com.google.appengine.api.datastore.DatastoreService.KeyRangeState;
8 import com.google.appengine.api.datastore.DatastoreServiceConfig.ApiVersion;
9 import com.google.appengine.api.datastore.FutureHelper.MultiFuture;
10 import com.google.appengine.api.datastore.Index.IndexState;
11 import com.google.appengine.api.utils.FutureWrapper;
12 import com.google.common.collect.ImmutableList;
13 import com.google.common.collect.Lists;
14 import com.google.common.collect.Maps;
15 import com.google.datastore.v1beta3.AllocateIdsRequest;
16 import com.google.datastore.v1beta3.AllocateIdsResponse;
17 import com.google.datastore.v1beta3.BeginTransactionRequest;
18 import com.google.datastore.v1beta3.BeginTransactionResponse;
19 import com.google.datastore.v1beta3.CommitRequest;
20 import com.google.datastore.v1beta3.CommitResponse;
21 import com.google.datastore.v1beta3.EntityResult;
22 import com.google.datastore.v1beta3.Key.PathElement;
23 import com.google.datastore.v1beta3.Key.PathElement.IdTypeCase;
24 import com.google.datastore.v1beta3.LookupRequest;
25 import com.google.datastore.v1beta3.LookupResponse;
26 import com.google.datastore.v1beta3.Mutation;
27 import com.google.datastore.v1beta3.MutationResult;
28 import com.google.datastore.v1beta3.ReadOptions;
29 import com.google.protobuf.Message;
31 import java.util.Arrays;
32 import java.util.Collection;
33 import java.util.Collections;
34 import java.util.Iterator;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.Set;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.TimeoutException;
43 /**
44 * An implementation of {@link AsyncDatastoreService} using the Cloud Datastore v1 API.
46 class AsyncCloudDatastoreV1ServiceImpl extends BaseAsyncDatastoreServiceImpl {
48 /**
49 * A base batcher for Cloud Datastore v1 operations executed in the context of an {@link
50 * AsyncCloudDatastoreV1ServiceImpl}.
52 * @param <S> the response message type
53 * @param <R> the request message builder type
54 * @param <F> the Java specific representation of a value
55 * @param <T> the proto representation of value
57 private abstract class V1Batcher<S extends Message, R extends Message.Builder, F,
58 T extends Message> extends BaseRpcBatcher<S, R, F, T> {
59 @Override
60 @SuppressWarnings("unchecked")
61 final R newBatch(R baseBatch) {
62 return (R) baseBatch.clone();
66 private final V1Batcher<CommitResponse, CommitRequest.Builder, Key, Mutation> deleteBatcher =
67 new V1Batcher<CommitResponse, CommitRequest.Builder, Key, Mutation>() {
68 @Override
69 void addToBatch(Mutation mutation, CommitRequest.Builder batch) {
70 batch.addMutations(mutation);
73 @Override
74 int getMaxCount() {
75 return datastoreServiceConfig.maxBatchWriteEntities;
78 @Override
79 protected Future<CommitResponse> makeCall(CommitRequest.Builder batch) {
80 return datastoreProxy.commit(batch.build());
83 @Override
84 final Object getGroup(Key key) {
85 return key.getRootKey();
88 @Override
89 final Mutation toPb(Key value) {
90 return Mutation.newBuilder()
91 .setOp(Mutation.Operation.DELETE)
92 .setKey(DataTypeTranslator.toV1Key(value))
93 .build();
97 private final V1Batcher<LookupResponse, LookupRequest.Builder, Key,
98 com.google.datastore.v1beta3.Key> lookupByKeyBatcher =
99 new V1Batcher<LookupResponse, LookupRequest.Builder, Key,
100 com.google.datastore.v1beta3.Key>() {
101 @Override
102 void addToBatch(com.google.datastore.v1beta3.Key key, LookupRequest.Builder batch) {
103 batch.addKeys(key);
106 @Override
107 int getMaxCount() {
108 return datastoreServiceConfig.maxBatchReadEntities;
111 @Override
112 protected Future<LookupResponse> makeCall(LookupRequest.Builder batch) {
113 return datastoreProxy.lookup(batch.build());
116 @Override
117 final Object getGroup(Key key) {
118 return key.getRootKey();
121 @Override
122 final com.google.datastore.v1beta3.Key toPb(Key value) {
123 return DataTypeTranslator.toV1Key(value).build();
127 private final V1Batcher<LookupResponse, LookupRequest.Builder, com.google.datastore.v1beta3.Key,
128 com.google.datastore.v1beta3.Key>
129 lookupByPbBatcher =
130 new V1Batcher<LookupResponse, LookupRequest.Builder, com.google.datastore.v1beta3.Key,
131 com.google.datastore.v1beta3.Key>() {
132 @Override
133 void addToBatch(com.google.datastore.v1beta3.Key key, LookupRequest.Builder batch) {
134 batch.addKeys(key);
137 @Override
138 int getMaxCount() {
139 return datastoreServiceConfig.maxBatchReadEntities;
142 @Override
143 protected Future<LookupResponse> makeCall(LookupRequest.Builder batch) {
144 return datastoreProxy.lookup(batch.build());
147 @Override
148 final Object getGroup(com.google.datastore.v1beta3.Key key) {
149 return key.getPath(0);
152 @Override
153 final com.google.datastore.v1beta3.Key toPb(com.google.datastore.v1beta3.Key value) {
154 return value;
158 private final V1Batcher<CommitResponse, CommitRequest.Builder, Entity, Mutation>
159 putBatcher = new V1Batcher<CommitResponse, CommitRequest.Builder, Entity, Mutation>() {
160 @Override
161 void addToBatch(Mutation mutation, CommitRequest.Builder batch) {
162 batch.addMutations(mutation);
165 @Override
166 int getMaxCount() {
167 return datastoreServiceConfig.maxBatchWriteEntities;
170 @Override
171 protected Future<CommitResponse> makeCall(CommitRequest.Builder batch) {
172 return datastoreProxy.commit(batch.build());
175 @Override
176 final Object getGroup(Entity value) {
177 return value.getKey().getRootKey();
180 @Override
181 final Mutation toPb(Entity value) {
182 return Mutation.newBuilder()
183 .setOp(Mutation.Operation.UPSERT)
184 .setEntity(DataTypeTranslator.toV1Entity(value))
185 .build();
189 private final V1Batcher<AllocateIdsResponse, AllocateIdsRequest.Builder, Key,
190 com.google.datastore.v1beta3.Key>
191 allocateIdsBatcher =
192 new V1Batcher<AllocateIdsResponse, AllocateIdsRequest.Builder, Key,
193 com.google.datastore.v1beta3.Key>() {
194 @Override
195 void addToBatch(com.google.datastore.v1beta3.Key key, AllocateIdsRequest.Builder batch) {
196 batch.addKeys(key);
199 @Override
200 int getMaxCount() {
201 return datastoreServiceConfig.maxBatchAllocateIdKeys;
204 @Override
205 protected Future<AllocateIdsResponse> makeCall(AllocateIdsRequest.Builder batch) {
206 return datastoreProxy.allocateIds(batch.build());
209 @Override
210 final Object getGroup(Key key) {
211 Key parent = key.getParent();
212 if (parent == null) {
213 return PathElement.getDefaultInstance();
214 } else {
215 return DataTypeTranslator.toV1Key(parent).getPath(0);
219 @Override
220 final com.google.datastore.v1beta3.Key toPb(Key value) {
221 return DataTypeTranslator.toV1Key(value).build();
225 private final CloudDatastoreV1Proxy datastoreProxy;
227 public AsyncCloudDatastoreV1ServiceImpl(
228 DatastoreServiceConfig datastoreServiceConfig, CloudDatastoreV1Proxy datastoreProxy,
229 TransactionStack defaultTxnProvider) {
230 super(datastoreServiceConfig, defaultTxnProvider,
231 new QueryRunnerCloudDatastoreV1(datastoreServiceConfig, datastoreProxy));
232 this.datastoreProxy = datastoreProxy;
235 @Override
236 protected TransactionImpl.InternalTransaction doBeginTransaction(TransactionOptions options) {
237 BeginTransactionRequest.Builder request = BeginTransactionRequest.newBuilder();
239 Future<BeginTransactionResponse> future = datastoreProxy.beginTransaction(request.build());
241 ApiVersion apiVersion = datastoreServiceConfig.getApiVersion();
242 switch (apiVersion) {
243 case CLOUD_DATASTORE_V1_VIA_API_PROXY:
244 case CLOUD_DATASTORE_V1_REMOTE:
245 return InternalTransactionCloudDatastoreV1.create(datastoreProxy, future);
246 default:
247 throw new IllegalStateException("Unsupported api version: " + apiVersion);
251 @Override
252 protected Future<Map<Key, Entity>> doBatchGet( Transaction txn,
253 final Set<Key> keysToGet, final Map<Key, Entity> resultMap) {
254 final LookupRequest.Builder baseReq = LookupRequest.newBuilder();
255 ReadOptions.Builder readOptionsBuilder = baseReq.getReadOptionsBuilder();
256 if (txn != null) {
257 TransactionImpl.ensureTxnActive(txn);
258 readOptionsBuilder.setTransaction(
259 InternalTransactionCloudDatastoreV1.getById(txn.getId()).getHandle());
260 } else if (datastoreServiceConfig.getReadPolicy().getConsistency() == EVENTUAL) {
261 readOptionsBuilder.setReadConsistency(ReadOptions.ReadConsistency.EVENTUAL);
262 } else {
263 baseReq.clearReadOptions();
266 final boolean shouldUseMultipleBatches = getDatastoreType() != MASTER_SLAVE && txn == null
267 && datastoreServiceConfig.getReadPolicy().getConsistency() != EVENTUAL;
269 Iterator<LookupRequest.Builder> batches = lookupByKeyBatcher.getBatches(keysToGet, baseReq,
270 baseReq.build().getSerializedSize(), shouldUseMultipleBatches);
271 List<Future<LookupResponse>> futures = lookupByKeyBatcher.makeCalls(batches);
273 return registerInTransaction(txn, new MultiFuture<LookupResponse, Map<Key, Entity>>(futures) {
275 * A Map from a Key without an app id specified to the corresponding Key that the
276 * user requested. This is a workaround for the Remote API to support matching requested
277 * Keys to Entities that may be from a different app id.
279 private Map<com.google.datastore.v1beta3.Key, Key> keyMapIgnoringAppId;
281 @Override
282 public Map<Key, Entity> get() throws InterruptedException, ExecutionException {
283 try {
284 aggregate(futures, null, null);
285 } catch (TimeoutException e) {
286 throw new RuntimeException(e);
288 return resultMap;
291 @Override
292 public Map<Key, Entity> get(long timeout, TimeUnit unit)
293 throws InterruptedException, ExecutionException, TimeoutException {
294 aggregate(futures, timeout, unit);
295 return resultMap;
299 * Aggregates the results of the given Futures and issues (synchronous) followup requests if
300 * any results were deferred.
302 * @param currentFutures the Futures corresponding to the batches of the initial
303 * LookupRequests.
304 * @param timeout the timeout to use while waiting on the Future, or null for none.
305 * @param timeoutUnit the unit of the timeout, or null for none.
307 private void aggregate(
308 Iterable<Future<LookupResponse>> currentFutures, Long timeout, TimeUnit timeoutUnit)
309 throws ExecutionException, InterruptedException, TimeoutException {
310 while (true) {
311 List<com.google.datastore.v1beta3.Key> deferredKeys = Lists.newArrayList();
313 for (Future<LookupResponse> currentFuture : currentFutures) {
314 LookupResponse resp = getFutureWithOptionalTimeout(currentFuture, timeout, timeoutUnit);
315 addEntitiesToResultMap(resp);
316 deferredKeys.addAll(resp.getDeferredList());
319 if (deferredKeys.isEmpty()) {
320 break;
323 Iterator<LookupRequest.Builder> followupBatches = lookupByPbBatcher.getBatches(
324 deferredKeys, baseReq, baseReq.build().getSerializedSize(), shouldUseMultipleBatches);
325 currentFutures = lookupByPbBatcher.makeCalls(followupBatches);
330 * Convenience method to get the result of a Future and optionally specify a timeout.
332 * @param future the Future to get.
333 * @param timeout the timeout to use while waiting on the Future, or null for none.
334 * @param timeoutUnit the unit of the timeout, or null for none.
335 * @return the result of the Future.
336 * @throws TimeoutException will only ever be thrown if a timeout is provided.
338 private LookupResponse getFutureWithOptionalTimeout(
339 Future<LookupResponse> future, Long timeout, TimeUnit timeoutUnit)
340 throws ExecutionException, InterruptedException, TimeoutException {
341 if (timeout == null) {
342 return future.get();
343 } else {
344 return future.get(timeout, timeoutUnit);
349 * Adds the Entities from the LookupResponse to the resultMap. Will omit Keys that were
350 * missing. Handles Keys with different App Ids from the Entity.Key. See
351 * {@link #findKeyFromRequestIgnoringAppId}.
353 private void addEntitiesToResultMap(LookupResponse response) {
354 for (EntityResult entityResult : response.getFoundList()) {
355 Entity responseEntity = DataTypeTranslator.toEntity(entityResult.getEntity());
356 Key responseKey = responseEntity.getKey();
358 if (!keysToGet.contains(responseKey)) {
359 responseKey = findKeyFromRequestIgnoringAppId(entityResult.getEntity().getKey());
361 resultMap.put(responseKey, responseEntity);
366 * This is a hack to support calls going through the Remote API. The problem is:
368 * The requested Key may have a local app id.
369 * The returned Entity may have a remote app id.
371 * In this case, we want to return a Map.Entry with {LocalKey, RemoteEntity}. This way, the
372 * user can always do map.get(keyFromRequest).
374 * This method will find the corresponding requested LocalKey for a RemoteKey by ignoring the
375 * AppId field.
377 * Note that we used to be able to rely on the order of the Response Entities matching the
378 * order of Request Keys. We can no longer do so with the addition of Deferred results.
380 * @param keyFromResponse the key from the Response that did not match any of the requested
381 * Keys.
382 * @return the Key from the request that corresponds to the given Key from the Response
383 * (ignoring AppId.)
385 private Key findKeyFromRequestIgnoringAppId(
386 com.google.datastore.v1beta3.Key keyFromResponse) {
387 if (keyMapIgnoringAppId == null) {
388 keyMapIgnoringAppId = Maps.newHashMap();
389 for (Key requestKey : keysToGet) {
390 com.google.datastore.v1beta3.Key.Builder requestKeyAsRefWithoutApp =
391 DataTypeTranslator.toV1Key(requestKey);
392 requestKeyAsRefWithoutApp.getPartitionIdBuilder().clearProjectId();
393 keyMapIgnoringAppId.put(requestKeyAsRefWithoutApp.build(), requestKey);
397 com.google.datastore.v1beta3.Key.Builder keyBuilder = keyFromResponse.toBuilder();
398 keyBuilder.getPartitionIdBuilder().clearProjectId();
399 Key result = keyMapIgnoringAppId.get(keyBuilder.build());
400 if (result == null) {
401 throw new DatastoreFailureException("Internal error");
403 return result;
408 @Override
409 protected Future<List<Key>> doBatchPut( final Transaction txn,
410 final List<Entity> entities) {
411 if (txn == null) {
412 CommitRequest.Builder baseReq = CommitRequest.newBuilder();
413 baseReq.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
414 List<Integer> order = Lists.newArrayListWithCapacity(entities.size());
415 Iterator<CommitRequest.Builder> batches = putBatcher.getBatches(entities, baseReq,
416 baseReq.build().getSerializedSize(), true, order);
417 List<Future<CommitResponse>> futures = putBatcher.makeCalls(batches);
419 return new ReorderingMultiFuture<CommitResponse, List<Key>>(futures, order) {
420 @Override
421 protected List<Key> aggregate(CommitResponse intermediateResult, Iterator<Integer> indexItr,
422 List<Key> result) {
423 for (MutationResult mutationResult : intermediateResult.getMutationResultsList()) {
424 int index = indexItr.next();
425 Key key = entities.get(index).getKey();
426 if (mutationResult.hasKey()) {
427 List<PathElement> pathElements =
428 mutationResult.getKey().getPathList();
429 key.setId(pathElements.get(pathElements.size() - 1).getId());
431 result.set(index, key);
433 return result;
436 @Override
437 protected List<Key> initResult(int size) {
438 List<Key> keyList = Lists.newArrayListWithCapacity(size);
439 keyList.addAll(Collections.<Key>nCopies(size, null));
440 return keyList;
445 TransactionImpl.ensureTxnActive(txn);
446 final InternalTransactionCloudDatastoreV1 txnV1 =
447 InternalTransactionCloudDatastoreV1.getById(txn.getId());
449 ImmutableList.Builder<Key> keyListBuilder = ImmutableList.builder();
450 final List<Key> incompleteKeys = Lists.newArrayList();
451 final List<com.google.datastore.v1beta3.Entity.Builder> incompleteEntityBldrs =
452 Lists.newArrayList();
453 for (Entity entity : entities) {
454 Key key = entity.getKey();
455 keyListBuilder.add(key);
456 if (key.isComplete()) {
457 txnV1.deferPut(entity);
458 } else {
459 com.google.datastore.v1beta3.Entity.Builder entityV1 =
460 com.google.datastore.v1beta3.Entity.newBuilder();
461 DataTypeTranslator.addPropertiesToPb(entity.getPropertyMap(), entityV1);
462 incompleteEntityBldrs.add(entityV1);
463 incompleteKeys.add(key);
466 final List<Key> allKeys = keyListBuilder.build();
468 if (incompleteKeys.isEmpty()) {
469 return new FutureHelper.FakeFuture<List<Key>>(allKeys);
471 return registerInTransaction(txn,
472 new FutureWrapper<List<com.google.datastore.v1beta3.Key>,
473 List<Key>>(allocateIds(incompleteKeys)) {
474 @Override
475 protected List<Key> wrap(List<com.google.datastore.v1beta3.Key> completedKeyPbs) {
476 Iterator<com.google.datastore.v1beta3.Entity.Builder> entityPbBldrIt =
477 incompleteEntityBldrs.iterator();
478 Iterator<Key> incompleteKeysIt = incompleteKeys.iterator();
479 for (com.google.datastore.v1beta3.Key keyV1 : completedKeyPbs) {
480 updateKey(keyV1, incompleteKeysIt.next());
481 txnV1.deferPut(entityPbBldrIt.next().setKey(keyV1));
483 return allKeys;
486 @Override
487 protected Throwable convertException(Throwable cause) {
488 return cause;
493 @Override
494 protected Future<Void> doBatchDelete( Transaction txn, Collection<Key> keys) {
495 if (txn != null) {
496 TransactionImpl.ensureTxnActive(txn);
497 InternalTransactionCloudDatastoreV1 txnV1 =
498 InternalTransactionCloudDatastoreV1.getById(txn.getId());
499 for (Key key : keys) {
500 txnV1.deferDelete(key);
502 return new FutureHelper.FakeFuture<Void>(null);
505 CommitRequest.Builder baseReq = CommitRequest.newBuilder();
506 baseReq.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
507 Iterator<CommitRequest.Builder> batches = deleteBatcher.getBatches(keys, baseReq,
508 baseReq.build().getSerializedSize(), true);
509 List<Future<CommitResponse>> futures = deleteBatcher.makeCalls(batches);
510 return new MultiFuture<CommitResponse, Void>(futures) {
511 @Override
512 public Void get() throws InterruptedException, ExecutionException {
513 for (Future<CommitResponse> future : futures) {
514 future.get();
516 return null;
519 @Override
520 public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
521 TimeoutException {
522 for (Future<CommitResponse> future : futures) {
523 future.get(timeout, unit);
525 return null;
531 * This API is specific to sequential IDs, which v1 does not support.
533 @Override
534 public Future<KeyRange> allocateIds(final Key parent, final String kind, long num) {
535 throw new UnsupportedOperationException();
539 * This API is specific to sequential IDs, which v1 does not support.
541 @Override
542 public Future<KeyRangeState> allocateIdRange(final KeyRange range) {
543 throw new UnsupportedOperationException();
547 * Allocates scattered IDs for a list of incomplete keys.
549 protected Future<List<com.google.datastore.v1beta3.Key>> allocateIds(List<Key> keyList) {
550 List<Integer> order = Lists.newArrayListWithCapacity(keyList.size());
551 Iterator<AllocateIdsRequest.Builder> batches = allocateIdsBatcher.getBatches(keyList,
552 AllocateIdsRequest.newBuilder(), 0, true, order);
553 List<Future<AllocateIdsResponse>> futures = allocateIdsBatcher.makeCalls(batches);
555 return new ReorderingMultiFuture<AllocateIdsResponse,
556 List<com.google.datastore.v1beta3.Key>>(futures, order) {
557 @Override
558 protected List<com.google.datastore.v1beta3.Key> aggregate(AllocateIdsResponse batch,
559 Iterator<Integer> indexItr,
560 List<com.google.datastore.v1beta3.Key> result) {
561 for (com.google.datastore.v1beta3.Key key : batch.getKeysList()) {
562 result.set(indexItr.next(), key);
564 return result;
567 @Override
568 protected List<com.google.datastore.v1beta3.Key> initResult(int size) {
569 return Arrays.asList(new com.google.datastore.v1beta3.Key[size]);
574 @Override
575 public Future<Map<Index, IndexState>> getIndexes() {
576 throw new UnsupportedOperationException();
580 * Update a key object with the id in the proto, if one exists.
582 private static void updateKey(com.google.datastore.v1beta3.Key keyV1, Key key) {
583 List<PathElement> pathElements = keyV1.getPathList();
584 if (!pathElements.isEmpty()) {
585 PathElement lastElement = pathElements.get(pathElements.size() - 1);
586 if (lastElement.getIdTypeCase() == IdTypeCase.ID) {
587 key.setId(lastElement.getId());