1.9.30 sync.
[gae.git] / java / src / main / com / google / appengine / api / datastore / AsyncCloudDatastoreV1ServiceImpl.java
blobe1ae9ef517aa6d4c7cc679d29acd449585439af5
1 package com.google.appengine.api.datastore;
3 import static com.google.appengine.api.datastore.DatastoreAttributes.DatastoreType.MASTER_SLAVE;
4 import static com.google.appengine.api.datastore.ReadPolicy.Consistency.EVENTUAL;
6 import com.google.appengine.api.datastore.Batcher.ReorderingMultiFuture;
7 import com.google.appengine.api.datastore.DatastoreService.KeyRangeState;
8 import com.google.appengine.api.datastore.DatastoreServiceConfig.ApiVersion;
9 import com.google.appengine.api.datastore.FutureHelper.MultiFuture;
10 import com.google.appengine.api.datastore.Index.IndexState;
11 import com.google.appengine.api.utils.FutureWrapper;
12 import com.google.common.collect.ImmutableList;
13 import com.google.common.collect.Lists;
14 import com.google.common.collect.Maps;
15 import com.google.datastore.v1beta3.AllocateIdsRequest;
16 import com.google.datastore.v1beta3.AllocateIdsResponse;
17 import com.google.datastore.v1beta3.BeginTransactionRequest;
18 import com.google.datastore.v1beta3.BeginTransactionResponse;
19 import com.google.datastore.v1beta3.CommitRequest;
20 import com.google.datastore.v1beta3.CommitResponse;
21 import com.google.datastore.v1beta3.EntityResult;
22 import com.google.datastore.v1beta3.Key.PathElement;
23 import com.google.datastore.v1beta3.Key.PathElement.IdTypeCase;
24 import com.google.datastore.v1beta3.LookupRequest;
25 import com.google.datastore.v1beta3.LookupResponse;
26 import com.google.datastore.v1beta3.Mutation;
27 import com.google.datastore.v1beta3.MutationResult;
28 import com.google.datastore.v1beta3.ReadOptions;
29 import com.google.protobuf.Message;
31 import java.util.Arrays;
32 import java.util.Collection;
33 import java.util.Collections;
34 import java.util.Iterator;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.Set;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.TimeoutException;
43 /**
44 * An implementation of {@link AsyncDatastoreService} using the Cloud Datastore v1 API.
46 class AsyncCloudDatastoreV1ServiceImpl extends BaseAsyncDatastoreServiceImpl {
48 /**
49 * A base batcher for Cloud Datastore v1 operations executed in the context of an {@link
50 * AsyncCloudDatastoreV1ServiceImpl}.
52 * @param <S> the response message type
53 * @param <R> the request message builder type
54 * @param <F> the Java specific representation of a value
55 * @param <T> the proto representation of value
57 private abstract class V1Batcher<S extends Message, R extends Message.Builder, F,
58 T extends Message> extends BaseRpcBatcher<S, R, F, T> {
59 @Override
60 @SuppressWarnings("unchecked")
61 final R newBatch(R baseBatch) {
62 return (R) baseBatch.clone();
66 private final V1Batcher<CommitResponse, CommitRequest.Builder, Key, Mutation> deleteBatcher =
67 new V1Batcher<CommitResponse, CommitRequest.Builder, Key, Mutation>() {
68 @Override
69 void addToBatch(Mutation mutation, CommitRequest.Builder batch) {
70 batch.addMutations(mutation);
73 @Override
74 int getMaxCount() {
75 return datastoreServiceConfig.maxBatchWriteEntities;
78 @Override
79 protected Future<CommitResponse> makeCall(CommitRequest.Builder batch) {
80 return datastoreProxy.commit(batch.build());
83 @Override
84 final Object getGroup(Key key) {
85 return key.getRootKey();
88 @Override
89 final Mutation toPb(Key value) {
90 return Mutation.newBuilder()
91 .setDelete(DataTypeTranslator.toV1Key(value))
92 .build();
96 private final V1Batcher<LookupResponse, LookupRequest.Builder, Key,
97 com.google.datastore.v1beta3.Key> lookupByKeyBatcher =
98 new V1Batcher<LookupResponse, LookupRequest.Builder, Key,
99 com.google.datastore.v1beta3.Key>() {
100 @Override
101 void addToBatch(com.google.datastore.v1beta3.Key key, LookupRequest.Builder batch) {
102 batch.addKeys(key);
105 @Override
106 int getMaxCount() {
107 return datastoreServiceConfig.maxBatchReadEntities;
110 @Override
111 protected Future<LookupResponse> makeCall(LookupRequest.Builder batch) {
112 return datastoreProxy.lookup(batch.build());
115 @Override
116 final Object getGroup(Key key) {
117 return key.getRootKey();
120 @Override
121 final com.google.datastore.v1beta3.Key toPb(Key value) {
122 return DataTypeTranslator.toV1Key(value).build();
126 private final V1Batcher<LookupResponse, LookupRequest.Builder, com.google.datastore.v1beta3.Key,
127 com.google.datastore.v1beta3.Key>
128 lookupByPbBatcher =
129 new V1Batcher<LookupResponse, LookupRequest.Builder, com.google.datastore.v1beta3.Key,
130 com.google.datastore.v1beta3.Key>() {
131 @Override
132 void addToBatch(com.google.datastore.v1beta3.Key key, LookupRequest.Builder batch) {
133 batch.addKeys(key);
136 @Override
137 int getMaxCount() {
138 return datastoreServiceConfig.maxBatchReadEntities;
141 @Override
142 protected Future<LookupResponse> makeCall(LookupRequest.Builder batch) {
143 return datastoreProxy.lookup(batch.build());
146 @Override
147 final Object getGroup(com.google.datastore.v1beta3.Key key) {
148 return key.getPath(0);
151 @Override
152 final com.google.datastore.v1beta3.Key toPb(com.google.datastore.v1beta3.Key value) {
153 return value;
157 private final V1Batcher<CommitResponse, CommitRequest.Builder, Entity, Mutation>
158 putBatcher = new V1Batcher<CommitResponse, CommitRequest.Builder, Entity, Mutation>() {
159 @Override
160 void addToBatch(Mutation mutation, CommitRequest.Builder batch) {
161 batch.addMutations(mutation);
164 @Override
165 int getMaxCount() {
166 return datastoreServiceConfig.maxBatchWriteEntities;
169 @Override
170 protected Future<CommitResponse> makeCall(CommitRequest.Builder batch) {
171 return datastoreProxy.commit(batch.build());
174 @Override
175 final Object getGroup(Entity value) {
176 return value.getKey().getRootKey();
179 @Override
180 final Mutation toPb(Entity value) {
181 return Mutation.newBuilder()
182 .setUpsert(DataTypeTranslator.toV1Entity(value))
183 .build();
187 private final V1Batcher<AllocateIdsResponse, AllocateIdsRequest.Builder, Key,
188 com.google.datastore.v1beta3.Key>
189 allocateIdsBatcher =
190 new V1Batcher<AllocateIdsResponse, AllocateIdsRequest.Builder, Key,
191 com.google.datastore.v1beta3.Key>() {
192 @Override
193 void addToBatch(com.google.datastore.v1beta3.Key key, AllocateIdsRequest.Builder batch) {
194 batch.addKeys(key);
197 @Override
198 int getMaxCount() {
199 return datastoreServiceConfig.maxBatchAllocateIdKeys;
202 @Override
203 protected Future<AllocateIdsResponse> makeCall(AllocateIdsRequest.Builder batch) {
204 return datastoreProxy.allocateIds(batch.build());
207 @Override
208 final Object getGroup(Key key) {
209 Key parent = key.getParent();
210 if (parent == null) {
211 return PathElement.getDefaultInstance();
212 } else {
213 return DataTypeTranslator.toV1Key(parent).getPath(0);
217 @Override
218 final com.google.datastore.v1beta3.Key toPb(Key value) {
219 return DataTypeTranslator.toV1Key(value).build();
223 private final CloudDatastoreV1Proxy datastoreProxy;
225 public AsyncCloudDatastoreV1ServiceImpl(
226 DatastoreServiceConfig datastoreServiceConfig, CloudDatastoreV1Proxy datastoreProxy,
227 TransactionStack defaultTxnProvider) {
228 super(datastoreServiceConfig, defaultTxnProvider,
229 new QueryRunnerCloudDatastoreV1(datastoreServiceConfig, datastoreProxy));
230 this.datastoreProxy = datastoreProxy;
233 @Override
234 protected TransactionImpl.InternalTransaction doBeginTransaction(TransactionOptions options) {
235 BeginTransactionRequest.Builder request = BeginTransactionRequest.newBuilder();
237 Future<BeginTransactionResponse> future = datastoreProxy.beginTransaction(request.build());
239 ApiVersion apiVersion = datastoreServiceConfig.getApiVersion();
240 switch (apiVersion) {
241 case CLOUD_DATASTORE_V1_VIA_API_PROXY:
242 case CLOUD_DATASTORE_V1_REMOTE:
243 return InternalTransactionCloudDatastoreV1.create(datastoreProxy, future);
244 default:
245 throw new IllegalStateException("Unsupported api version: " + apiVersion);
249 @Override
250 protected Future<Map<Key, Entity>> doBatchGet( Transaction txn,
251 final Set<Key> keysToGet, final Map<Key, Entity> resultMap) {
252 final LookupRequest.Builder baseReq = LookupRequest.newBuilder();
253 ReadOptions.Builder readOptionsBuilder = baseReq.getReadOptionsBuilder();
254 if (txn != null) {
255 TransactionImpl.ensureTxnActive(txn);
256 readOptionsBuilder.setTransaction(
257 InternalTransactionCloudDatastoreV1.get(txn).getTransactionBytes());
258 } else if (datastoreServiceConfig.getReadPolicy().getConsistency() == EVENTUAL) {
259 readOptionsBuilder.setReadConsistency(ReadOptions.ReadConsistency.EVENTUAL);
260 } else {
261 baseReq.clearReadOptions();
264 final boolean shouldUseMultipleBatches = getDatastoreType() != MASTER_SLAVE && txn == null
265 && datastoreServiceConfig.getReadPolicy().getConsistency() != EVENTUAL;
267 Iterator<LookupRequest.Builder> batches = lookupByKeyBatcher.getBatches(keysToGet, baseReq,
268 baseReq.build().getSerializedSize(), shouldUseMultipleBatches);
269 List<Future<LookupResponse>> futures = lookupByKeyBatcher.makeCalls(batches);
271 return registerInTransaction(txn, new MultiFuture<LookupResponse, Map<Key, Entity>>(futures) {
273 * A Map from a Key without an app id specified to the corresponding Key that the
274 * user requested. This is a workaround for the Remote API to support matching requested
275 * Keys to Entities that may be from a different app id.
277 private Map<com.google.datastore.v1beta3.Key, Key> keyMapIgnoringAppId;
279 @Override
280 public Map<Key, Entity> get() throws InterruptedException, ExecutionException {
281 try {
282 aggregate(futures, null, null);
283 } catch (TimeoutException e) {
284 throw new RuntimeException(e);
286 return resultMap;
289 @Override
290 public Map<Key, Entity> get(long timeout, TimeUnit unit)
291 throws InterruptedException, ExecutionException, TimeoutException {
292 aggregate(futures, timeout, unit);
293 return resultMap;
297 * Aggregates the results of the given Futures and issues (synchronous) followup requests if
298 * any results were deferred.
300 * @param currentFutures the Futures corresponding to the batches of the initial
301 * LookupRequests.
302 * @param timeout the timeout to use while waiting on the Future, or null for none.
303 * @param timeoutUnit the unit of the timeout, or null for none.
305 private void aggregate(
306 Iterable<Future<LookupResponse>> currentFutures, Long timeout, TimeUnit timeoutUnit)
307 throws ExecutionException, InterruptedException, TimeoutException {
308 while (true) {
309 List<com.google.datastore.v1beta3.Key> deferredKeys = Lists.newArrayList();
311 for (Future<LookupResponse> currentFuture : currentFutures) {
312 LookupResponse resp = getFutureWithOptionalTimeout(currentFuture, timeout, timeoutUnit);
313 addEntitiesToResultMap(resp);
314 deferredKeys.addAll(resp.getDeferredList());
317 if (deferredKeys.isEmpty()) {
318 break;
321 Iterator<LookupRequest.Builder> followupBatches = lookupByPbBatcher.getBatches(
322 deferredKeys, baseReq, baseReq.build().getSerializedSize(), shouldUseMultipleBatches);
323 currentFutures = lookupByPbBatcher.makeCalls(followupBatches);
328 * Convenience method to get the result of a Future and optionally specify a timeout.
330 * @param future the Future to get.
331 * @param timeout the timeout to use while waiting on the Future, or null for none.
332 * @param timeoutUnit the unit of the timeout, or null for none.
333 * @return the result of the Future.
334 * @throws TimeoutException will only ever be thrown if a timeout is provided.
336 private LookupResponse getFutureWithOptionalTimeout(
337 Future<LookupResponse> future, Long timeout, TimeUnit timeoutUnit)
338 throws ExecutionException, InterruptedException, TimeoutException {
339 if (timeout == null) {
340 return future.get();
341 } else {
342 return future.get(timeout, timeoutUnit);
347 * Adds the Entities from the LookupResponse to the resultMap. Will omit Keys that were
348 * missing. Handles Keys with different App Ids from the Entity.Key. See
349 * {@link #findKeyFromRequestIgnoringAppId}.
351 private void addEntitiesToResultMap(LookupResponse response) {
352 for (EntityResult entityResult : response.getFoundList()) {
353 Entity responseEntity = DataTypeTranslator.toEntity(entityResult.getEntity());
354 Key responseKey = responseEntity.getKey();
356 if (!keysToGet.contains(responseKey)) {
357 responseKey = findKeyFromRequestIgnoringAppId(entityResult.getEntity().getKey());
359 resultMap.put(responseKey, responseEntity);
364 * This is a hack to support calls going through the Remote API. The problem is:
366 * The requested Key may have a local app id.
367 * The returned Entity may have a remote app id.
369 * In this case, we want to return a Map.Entry with {LocalKey, RemoteEntity}. This way, the
370 * user can always do map.get(keyFromRequest).
372 * This method will find the corresponding requested LocalKey for a RemoteKey by ignoring the
373 * AppId field.
375 * Note that we used to be able to rely on the order of the Response Entities matching the
376 * order of Request Keys. We can no longer do so with the addition of Deferred results.
378 * @param keyFromResponse the key from the Response that did not match any of the requested
379 * Keys.
380 * @return the Key from the request that corresponds to the given Key from the Response
381 * (ignoring AppId.)
383 private Key findKeyFromRequestIgnoringAppId(
384 com.google.datastore.v1beta3.Key keyFromResponse) {
385 if (keyMapIgnoringAppId == null) {
386 keyMapIgnoringAppId = Maps.newHashMap();
387 for (Key requestKey : keysToGet) {
388 com.google.datastore.v1beta3.Key.Builder requestKeyAsRefWithoutApp =
389 DataTypeTranslator.toV1Key(requestKey);
390 requestKeyAsRefWithoutApp.getPartitionIdBuilder().clearProjectId();
391 keyMapIgnoringAppId.put(requestKeyAsRefWithoutApp.build(), requestKey);
395 com.google.datastore.v1beta3.Key.Builder keyBuilder = keyFromResponse.toBuilder();
396 keyBuilder.getPartitionIdBuilder().clearProjectId();
397 Key result = keyMapIgnoringAppId.get(keyBuilder.build());
398 if (result == null) {
399 throw new DatastoreFailureException("Internal error");
401 return result;
406 @Override
407 protected Future<List<Key>> doBatchPut( final Transaction txn,
408 final List<Entity> entities) {
409 if (txn == null) {
410 CommitRequest.Builder baseReq = CommitRequest.newBuilder();
411 baseReq.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
412 List<Integer> order = Lists.newArrayListWithCapacity(entities.size());
413 Iterator<CommitRequest.Builder> batches = putBatcher.getBatches(entities, baseReq,
414 baseReq.build().getSerializedSize(), true, order);
415 List<Future<CommitResponse>> futures = putBatcher.makeCalls(batches);
417 return new ReorderingMultiFuture<CommitResponse, List<Key>>(futures, order) {
418 @Override
419 protected List<Key> aggregate(CommitResponse intermediateResult, Iterator<Integer> indexItr,
420 List<Key> result) {
421 for (MutationResult mutationResult : intermediateResult.getMutationResultsList()) {
422 int index = indexItr.next();
423 Key key = entities.get(index).getKey();
424 if (mutationResult.hasKey()) {
425 List<PathElement> pathElements =
426 mutationResult.getKey().getPathList();
427 key.setId(pathElements.get(pathElements.size() - 1).getId());
429 result.set(index, key);
431 return result;
434 @Override
435 protected List<Key> initResult(int size) {
436 List<Key> keyList = Lists.newArrayListWithCapacity(size);
437 keyList.addAll(Collections.<Key>nCopies(size, null));
438 return keyList;
443 TransactionImpl.ensureTxnActive(txn);
444 final InternalTransactionCloudDatastoreV1 txnV1 = InternalTransactionCloudDatastoreV1.get(txn);
446 ImmutableList.Builder<Key> keyListBuilder = ImmutableList.builder();
447 final List<Key> incompleteKeys = Lists.newArrayList();
448 final List<com.google.datastore.v1beta3.Entity.Builder> incompleteEntityBldrs =
449 Lists.newArrayList();
450 for (Entity entity : entities) {
451 Key key = entity.getKey();
452 keyListBuilder.add(key);
453 if (key.isComplete()) {
454 txnV1.deferPut(entity);
455 } else {
456 com.google.datastore.v1beta3.Entity.Builder entityV1 =
457 com.google.datastore.v1beta3.Entity.newBuilder();
458 DataTypeTranslator.addPropertiesToPb(entity.getPropertyMap(), entityV1);
459 incompleteEntityBldrs.add(entityV1);
460 incompleteKeys.add(key);
463 final List<Key> allKeys = keyListBuilder.build();
465 if (incompleteKeys.isEmpty()) {
466 return new FutureHelper.FakeFuture<List<Key>>(allKeys);
468 return registerInTransaction(txn,
469 new FutureWrapper<List<com.google.datastore.v1beta3.Key>,
470 List<Key>>(allocateIds(incompleteKeys)) {
471 @Override
472 protected List<Key> wrap(List<com.google.datastore.v1beta3.Key> completedKeyPbs) {
473 Iterator<com.google.datastore.v1beta3.Entity.Builder> entityPbBldrIt =
474 incompleteEntityBldrs.iterator();
475 Iterator<Key> incompleteKeysIt = incompleteKeys.iterator();
476 for (com.google.datastore.v1beta3.Key keyV1 : completedKeyPbs) {
477 updateKey(keyV1, incompleteKeysIt.next());
478 txnV1.deferPut(entityPbBldrIt.next().setKey(keyV1));
480 return allKeys;
483 @Override
484 protected Throwable convertException(Throwable cause) {
485 return cause;
490 @Override
491 protected Future<Void> doBatchDelete( Transaction txn, Collection<Key> keys) {
492 if (txn != null) {
493 TransactionImpl.ensureTxnActive(txn);
494 InternalTransactionCloudDatastoreV1 txnV1 = InternalTransactionCloudDatastoreV1.get(txn);
495 for (Key key : keys) {
496 txnV1.deferDelete(key);
498 return new FutureHelper.FakeFuture<Void>(null);
501 CommitRequest.Builder baseReq = CommitRequest.newBuilder();
502 baseReq.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
503 Iterator<CommitRequest.Builder> batches = deleteBatcher.getBatches(keys, baseReq,
504 baseReq.build().getSerializedSize(), true);
505 List<Future<CommitResponse>> futures = deleteBatcher.makeCalls(batches);
506 return new MultiFuture<CommitResponse, Void>(futures) {
507 @Override
508 public Void get() throws InterruptedException, ExecutionException {
509 for (Future<CommitResponse> future : futures) {
510 future.get();
512 return null;
515 @Override
516 public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
517 TimeoutException {
518 for (Future<CommitResponse> future : futures) {
519 future.get(timeout, unit);
521 return null;
527 * This API is specific to sequential IDs, which Cloud Datastore v1 does not support.
529 @Override
530 public Future<KeyRange> allocateIds(final Key parent, final String kind, long num) {
531 throw new UnsupportedOperationException();
535 * This API is specific to sequential IDs, which Cloud Datastore v1 does not support.
537 @Override
538 public Future<KeyRangeState> allocateIdRange(final KeyRange range) {
539 throw new UnsupportedOperationException();
543 * Allocates scattered IDs for a list of incomplete keys.
545 protected Future<List<com.google.datastore.v1beta3.Key>> allocateIds(List<Key> keyList) {
546 List<Integer> order = Lists.newArrayListWithCapacity(keyList.size());
547 Iterator<AllocateIdsRequest.Builder> batches = allocateIdsBatcher.getBatches(keyList,
548 AllocateIdsRequest.newBuilder(), 0, true, order);
549 List<Future<AllocateIdsResponse>> futures = allocateIdsBatcher.makeCalls(batches);
551 return new ReorderingMultiFuture<AllocateIdsResponse,
552 List<com.google.datastore.v1beta3.Key>>(futures, order) {
553 @Override
554 protected List<com.google.datastore.v1beta3.Key> aggregate(AllocateIdsResponse batch,
555 Iterator<Integer> indexItr,
556 List<com.google.datastore.v1beta3.Key> result) {
557 for (com.google.datastore.v1beta3.Key key : batch.getKeysList()) {
558 result.set(indexItr.next(), key);
560 return result;
563 @Override
564 protected List<com.google.datastore.v1beta3.Key> initResult(int size) {
565 return Arrays.asList(new com.google.datastore.v1beta3.Key[size]);
570 @Override
571 public Future<Map<Index, IndexState>> getIndexes() {
572 throw new UnsupportedOperationException();
576 * Update a {@link Key} with the id from a key proto, if it is populated.
578 private static void updateKey(com.google.datastore.v1beta3.Key keyV1, Key key) {
579 List<PathElement> pathElements = keyV1.getPathList();
580 if (!pathElements.isEmpty()) {
581 PathElement lastElement = pathElements.get(pathElements.size() - 1);
582 if (lastElement.getIdTypeCase() == IdTypeCase.ID) {
583 key.setId(lastElement.getId());