Revision created by MOE tool push_codebase.
[gae.git] / java / src / main / com / google / appengine / api / datastore / AsyncDatastoreV4ServiceImpl.java
blobc71a5393acc5255009613e8cfe15292fa5c1212b
1 package com.google.appengine.api.datastore;
3 import static com.google.appengine.api.datastore.DatastoreAttributes.DatastoreType.MASTER_SLAVE;
4 import static com.google.appengine.api.datastore.ReadPolicy.Consistency.EVENTUAL;
6 import com.google.appengine.api.datastore.Batcher.ReorderingMultiFuture;
7 import com.google.appengine.api.datastore.DatastoreService.KeyRangeState;
8 import com.google.appengine.api.datastore.DatastoreServiceConfig.ApiVersion;
9 import com.google.appengine.api.datastore.FutureHelper.MultiFuture;
10 import com.google.appengine.api.datastore.Index.IndexState;
11 import com.google.appengine.api.utils.FutureWrapper;
12 import com.google.apphosting.datastore.DatastoreV4.AllocateIdsRequest;
13 import com.google.apphosting.datastore.DatastoreV4.AllocateIdsResponse;
14 import com.google.apphosting.datastore.DatastoreV4.BeginTransactionRequest;
15 import com.google.apphosting.datastore.DatastoreV4.BeginTransactionResponse;
16 import com.google.apphosting.datastore.DatastoreV4.CommitRequest;
17 import com.google.apphosting.datastore.DatastoreV4.CommitResponse;
18 import com.google.apphosting.datastore.DatastoreV4.EntityResult;
19 import com.google.apphosting.datastore.DatastoreV4.LookupRequest;
20 import com.google.apphosting.datastore.DatastoreV4.LookupResponse;
21 import com.google.apphosting.datastore.DatastoreV4.Mutation;
22 import com.google.apphosting.datastore.DatastoreV4.MutationResult;
23 import com.google.apphosting.datastore.DatastoreV4.ReadOptions;
24 import com.google.apphosting.datastore.EntityV4;
25 import com.google.apphosting.datastore.EntityV4.Key.PathElement;
26 import com.google.common.collect.ImmutableList;
27 import com.google.common.collect.Lists;
28 import com.google.common.collect.Maps;
29 import com.google.protobuf.Message;
31 import java.util.Arrays;
32 import java.util.Collection;
33 import java.util.Collections;
34 import java.util.Iterator;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.Set;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.TimeoutException;
43 /**
44 * An implementation of AsyncDatastoreService using the DatastoreV4 API.
46 class AsyncDatastoreV4ServiceImpl extends BaseAsyncDatastoreServiceImpl {
48 /**
49 * A base batcher for DatastoreV4 operations executed in the context of an {@link
50 * AsyncDatastoreV4ServiceImpl}.
51 * @param <S> the response message type
52 * @param <R> the request message builder type
53 * @param <F> the Java specific representation of a value
54 * @param <T> the proto representation of value
56 private abstract class V4Batcher<S extends Message, R extends Message.Builder, F,
57 T extends Message> extends BaseRpcBatcher<S, R, F, T> {
58 @Override
59 @SuppressWarnings("unchecked")
60 final R newBatch(R baseBatch) {
61 return (R) baseBatch.clone();
65 private final V4Batcher<CommitResponse, CommitRequest.Builder, Key, Mutation> deleteBatcher =
66 new V4Batcher<CommitResponse, CommitRequest.Builder, Key, Mutation>() {
67 @Override
68 void addToBatch(Mutation mutation, CommitRequest.Builder batch) {
69 batch.addMutation(mutation);
72 @Override
73 int getMaxCount() {
74 return datastoreServiceConfig.maxBatchWriteEntities;
77 @Override
78 protected Future<CommitResponse> makeCall(CommitRequest.Builder batch) {
79 return datastoreProxy.commit(batch.build());
82 @Override
83 final Object getGroup(Key key) {
84 return key.getRootKey();
87 @Override
88 final Mutation toPb(Key value) {
89 return Mutation.newBuilder()
90 .setOp(Mutation.Operation.DELETE)
91 .setKey(DataTypeTranslator.toV4Key(value))
92 .build();
96 private final V4Batcher<LookupResponse, LookupRequest.Builder, Key, EntityV4.Key>
97 lookupByKeyBatcher =
98 new V4Batcher<LookupResponse, LookupRequest.Builder, Key, EntityV4.Key>() {
99 @Override
100 void addToBatch(EntityV4.Key key, LookupRequest.Builder batch) {
101 batch.addKey(key);
104 @Override
105 int getMaxCount() {
106 return datastoreServiceConfig.maxBatchReadEntities;
109 @Override
110 protected Future<LookupResponse> makeCall(LookupRequest.Builder batch) {
111 return datastoreProxy.lookup(batch.build());
114 @Override
115 final Object getGroup(Key key) {
116 return key.getRootKey();
119 @Override
120 final EntityV4.Key toPb(Key value) {
121 return DataTypeTranslator.toV4Key(value).build();
125 private final V4Batcher<LookupResponse, LookupRequest.Builder, EntityV4.Key, EntityV4.Key>
126 lookupByPbBatcher =
127 new V4Batcher<LookupResponse, LookupRequest.Builder, EntityV4.Key, EntityV4.Key>() {
128 @Override
129 void addToBatch(EntityV4.Key key, LookupRequest.Builder batch) {
130 batch.addKey(key);
133 @Override
134 int getMaxCount() {
135 return datastoreServiceConfig.maxBatchReadEntities;
138 @Override
139 protected Future<LookupResponse> makeCall(LookupRequest.Builder batch) {
140 return datastoreProxy.lookup(batch.build());
143 @Override
144 final Object getGroup(EntityV4.Key key) {
145 return key.getPathElement(0);
148 @Override
149 final EntityV4.Key toPb(EntityV4.Key value) {
150 return value;
154 private final V4Batcher<CommitResponse, CommitRequest.Builder, Entity, Mutation>
155 putBatcher = new V4Batcher<CommitResponse, CommitRequest.Builder, Entity, Mutation>() {
156 @Override
157 void addToBatch(Mutation mutation, CommitRequest.Builder batch) {
158 batch.addMutation(mutation);
161 @Override
162 int getMaxCount() {
163 return datastoreServiceConfig.maxBatchWriteEntities;
166 @Override
167 protected Future<CommitResponse> makeCall(CommitRequest.Builder batch) {
168 return datastoreProxy.commit(batch.build());
171 @Override
172 final Object getGroup(Entity value) {
173 return value.getKey().getRootKey();
176 @Override
177 final Mutation toPb(Entity value) {
178 return Mutation.newBuilder()
179 .setOp(Mutation.Operation.UPSERT)
180 .setEntity(DataTypeTranslator.toV4Entity(value))
181 .build();
185 private final V4Batcher<AllocateIdsResponse, AllocateIdsRequest.Builder, Key, EntityV4.Key>
186 allocateIdsBatcher =
187 new V4Batcher<AllocateIdsResponse, AllocateIdsRequest.Builder, Key, EntityV4.Key>() {
188 @Override
189 void addToBatch(EntityV4.Key key, AllocateIdsRequest.Builder batch) {
190 batch.addAllocate(key);
193 @Override
194 int getMaxCount() {
195 return datastoreServiceConfig.maxBatchAllocateIdKeys;
198 @Override
199 protected Future<AllocateIdsResponse> makeCall(AllocateIdsRequest.Builder batch) {
200 return datastoreProxy.allocateIds(batch.build());
203 @Override
204 final Object getGroup(Key key) {
205 Key parent = key.getParent();
206 if (parent == null) {
207 return EntityV4.Key.PathElement.getDefaultInstance();
208 } else {
209 return DataTypeTranslator.toV4Key(parent).getPathElement(0);
213 @Override
214 final EntityV4.Key toPb(Key value) {
215 return DataTypeTranslator.toV4Key(value).build();
219 private final DatastoreV4Proxy datastoreProxy;
221 public AsyncDatastoreV4ServiceImpl(
222 DatastoreServiceConfig datastoreServiceConfig, DatastoreV4Proxy datastoreProxy,
223 TransactionStack defaultTxnProvider) {
224 super(datastoreServiceConfig, defaultTxnProvider,
225 new QueryRunnerV4(datastoreServiceConfig, datastoreProxy));
226 this.datastoreProxy = datastoreProxy;
229 @Override
230 protected TransactionImpl.InternalTransaction doBeginTransaction(TransactionOptions options) {
231 BeginTransactionRequest.Builder request = BeginTransactionRequest.newBuilder();
232 request.setCrossGroup(options.isXG());
234 Future<BeginTransactionResponse> future = datastoreProxy.beginTransaction(request.build());
236 ApiVersion apiVersion = datastoreServiceConfig.getApiVersion();
237 switch (apiVersion) {
238 case CLOUD_DATASTORE:
239 return InternalTransactionCloudDatastore.create(datastoreProxy, future);
240 case V4:
241 return InternalTransactionV4.create(datastoreProxy, future);
242 default:
243 throw new IllegalStateException("Unsupported api version: " + apiVersion);
247 @Override
248 protected Future<Map<Key, Entity>> doBatchGet( Transaction txn,
249 final Set<Key> keysToGet, final Map<Key, Entity> resultMap) {
250 final LookupRequest.Builder baseReq = LookupRequest.newBuilder();
251 ReadOptions.Builder readOptionsBuilder = baseReq.getReadOptionsBuilder();
252 if (txn != null) {
253 TransactionImpl.ensureTxnActive(txn);
254 readOptionsBuilder.setTransaction(InternalTransactionV4.getById(txn.getId()).getHandle());
255 } else if (datastoreServiceConfig.getReadPolicy().getConsistency() == EVENTUAL) {
256 readOptionsBuilder.setReadConsistency(ReadOptions.ReadConsistency.EVENTUAL);
257 } else {
258 baseReq.clearReadOptions();
261 final boolean shouldUseMultipleBatches = getDatastoreType() != MASTER_SLAVE && txn == null
262 && datastoreServiceConfig.getReadPolicy().getConsistency() != EVENTUAL;
264 Iterator<LookupRequest.Builder> batches = lookupByKeyBatcher.getBatches(keysToGet, baseReq,
265 baseReq.build().getSerializedSize(), shouldUseMultipleBatches);
266 List<Future<LookupResponse>> futures = lookupByKeyBatcher.makeCalls(batches);
268 return registerInTransaction(txn, new MultiFuture<LookupResponse, Map<Key, Entity>>(futures) {
270 * A Map from an EntityV4.Key without an App Id specified to the corresponding Key that the
271 * user requested. This is a workaround for the Remote API to support matching requested
272 * Keys to Entities that may be from a different App Id.
274 private Map<EntityV4.Key, Key> keyMapIgnoringAppId;
276 @Override
277 public Map<Key, Entity> get() throws InterruptedException, ExecutionException {
278 try {
279 aggregate(futures, null, null);
280 } catch (TimeoutException e) {
281 throw new RuntimeException(e);
283 return resultMap;
286 @Override
287 public Map<Key, Entity> get(long timeout, TimeUnit unit)
288 throws InterruptedException, ExecutionException, TimeoutException {
289 aggregate(futures, timeout, unit);
290 return resultMap;
294 * Aggregates the results of the given Futures and issues (synchronous) followup requests if
295 * any results were deferred.
297 * @param currentFutures the Futures corresponding to the batches of the initial
298 * LookupRequests.
299 * @param timeout the timeout to use while waiting on the Future, or null for none.
300 * @param timeoutUnit the unit of the timeout, or null for none.
302 private void aggregate(
303 Iterable<Future<LookupResponse>> currentFutures, Long timeout, TimeUnit timeoutUnit)
304 throws ExecutionException, InterruptedException, TimeoutException {
305 while (true) {
306 List<EntityV4.Key> deferredKeys = Lists.newArrayList();
308 for (Future<LookupResponse> currentFuture : currentFutures) {
309 LookupResponse resp = getFutureWithOptionalTimeout(currentFuture, timeout, timeoutUnit);
310 addEntitiesToResultMap(resp);
311 deferredKeys.addAll(resp.getDeferredList());
314 if (deferredKeys.isEmpty()) {
315 break;
318 Iterator<LookupRequest.Builder> followupBatches = lookupByPbBatcher.getBatches(
319 deferredKeys, baseReq, baseReq.build().getSerializedSize(), shouldUseMultipleBatches);
320 currentFutures = lookupByPbBatcher.makeCalls(followupBatches);
325 * Convenience method to get the result of a Future and optionally specify a timeout.
327 * @param future the Future to get.
328 * @param timeout the timeout to use while waiting on the Future, or null for none.
329 * @param timeoutUnit the unit of the timeout, or null for none.
330 * @return the result of the Future.
331 * @throws TimeoutException will only ever be thrown if a timeout is provided.
333 private LookupResponse getFutureWithOptionalTimeout(
334 Future<LookupResponse> future, Long timeout, TimeUnit timeoutUnit)
335 throws ExecutionException, InterruptedException, TimeoutException {
336 if (timeout == null) {
337 return future.get();
338 } else {
339 return future.get(timeout, timeoutUnit);
344 * Adds the Entities from the LookupResponse to the resultMap. Will omit Keys that were
345 * missing. Handles Keys with different App Ids from the Entity.Key. See
346 * {@link #findKeyFromRequestIgnoringAppId(EntityV4.Key)}
348 private void addEntitiesToResultMap(LookupResponse response) {
349 for (EntityResult entityResult : response.getFoundList()) {
350 Entity responseEntity = DataTypeTranslator.toEntity(entityResult.getEntity());
351 Key responseKey = responseEntity.getKey();
353 if (!keysToGet.contains(responseKey)) {
354 responseKey = findKeyFromRequestIgnoringAppId(entityResult.getEntity().getKey());
356 resultMap.put(responseKey, responseEntity);
361 * This is a hack to support calls going through the Remote API. The problem is:
363 * The requested Key may have a local app id.
364 * The returned Entity may have a remote app id.
366 * In this case, we want to return a Map.Entry with {LocalKey, RemoteEntity}. This way, the
367 * user can always do map.get(keyFromRequest).
369 * This method will find the corresponding requested LocalKey for a RemoteKey by ignoring the
370 * AppId field.
372 * Note that we used to be able to rely on the order of the Response Entities matching the
373 * order of Request Keys. We can no longer do so with the addition of Deferred results.
375 * @param keyFromResponse the key from the Response that did not match any of the requested
376 * Keys.
377 * @return the Key from the request that corresponds to the given Key from the Response
378 * (ignoring AppId.)
380 private Key findKeyFromRequestIgnoringAppId(EntityV4.Key keyFromResponse) {
381 if (keyMapIgnoringAppId == null) {
382 keyMapIgnoringAppId = Maps.newHashMap();
383 for (Key requestKey : keysToGet) {
384 EntityV4.Key.Builder requestKeyAsRefWithoutApp = DataTypeTranslator.toV4Key(requestKey);
385 requestKeyAsRefWithoutApp.getPartitionIdBuilder().clearDatasetId();
386 keyMapIgnoringAppId.put(requestKeyAsRefWithoutApp.build(), requestKey);
390 EntityV4.Key.Builder keyBuilder = keyFromResponse.toBuilder();
391 keyBuilder.getPartitionIdBuilder().clearDatasetId();
392 Key result = keyMapIgnoringAppId.get(keyBuilder.build());
393 if (result == null) {
394 throw new DatastoreFailureException("Internal error");
396 return result;
401 @Override
402 protected Future<List<Key>> doBatchPut( final Transaction txn,
403 final List<Entity> entities) {
404 if (txn == null) {
405 CommitRequest.Builder baseReq = CommitRequest.newBuilder();
406 baseReq.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
407 List<Integer> order = Lists.newArrayListWithCapacity(entities.size());
408 Iterator<CommitRequest.Builder> batches = putBatcher.getBatches(entities, baseReq,
409 baseReq.build().getSerializedSize(), true, order);
410 List<Future<CommitResponse>> futures = putBatcher.makeCalls(batches);
412 return new ReorderingMultiFuture<CommitResponse, List<Key>>(futures, order) {
413 @Override
414 protected List<Key> aggregate(CommitResponse intermediateResult, Iterator<Integer> indexItr,
415 List<Key> result) {
416 for (MutationResult mutationResult : intermediateResult.getMutationResultList()) {
417 int index = indexItr.next();
418 Key key = entities.get(index).getKey();
419 if (mutationResult.hasKey()) {
420 List<EntityV4.Key.PathElement> pathElements =
421 mutationResult.getKey().getPathElementList();
422 key.setId(pathElements.get(pathElements.size() - 1).getId());
424 result.set(index, key);
426 return result;
429 @Override
430 protected List<Key> initResult(int size) {
431 List<Key> keyList = Lists.newArrayListWithCapacity(size);
432 keyList.addAll(Collections.<Key>nCopies(size, null));
433 return keyList;
438 TransactionImpl.ensureTxnActive(txn);
439 final BaseInternalTransactionV4<?> v4txn = InternalTransactionV4.getById(txn.getId());
441 ImmutableList.Builder<Key> keyListBuilder = ImmutableList.builder();
442 final List<Key> incompleteKeys = Lists.newArrayList();
443 final List<EntityV4.Entity.Builder> incompleteEntityBldrs = Lists.newArrayList();
444 for (Entity entity : entities) {
445 Key key = entity.getKey();
446 keyListBuilder.add(key);
447 if (key.isComplete()) {
448 v4txn.deferPut(entity);
449 } else {
450 EntityV4.Entity.Builder v4Entity = EntityV4.Entity.newBuilder();
451 DataTypeTranslator.addPropertiesToPb(entity.getPropertyMap(), v4Entity);
452 incompleteEntityBldrs.add(v4Entity);
453 incompleteKeys.add(key);
456 final List<Key> allKeys = keyListBuilder.build();
458 if (incompleteKeys.isEmpty()) {
459 return new FutureHelper.FakeFuture<List<Key>>(allKeys);
461 return registerInTransaction(txn,
462 new FutureWrapper<List<EntityV4.Key>, List<Key>>(allocateIds(incompleteKeys)) {
463 @Override
464 protected List<Key> wrap(List<EntityV4.Key> completedKeyPbs) {
465 Iterator<EntityV4.Entity.Builder> entityPbBldrIt = incompleteEntityBldrs.iterator();
466 Iterator<Key> incompleteKeysIt = incompleteKeys.iterator();
467 for (EntityV4.Key v4Key : completedKeyPbs) {
468 updateKey(v4Key, incompleteKeysIt.next());
469 v4txn.deferPut(entityPbBldrIt.next().setKey(v4Key));
471 return allKeys;
474 @Override
475 protected Throwable convertException(Throwable cause) {
476 return cause;
481 @Override
482 protected Future<Void> doBatchDelete( Transaction txn, Collection<Key> keys) {
483 if (txn != null) {
484 TransactionImpl.ensureTxnActive(txn);
485 BaseInternalTransactionV4<?> v4txn = InternalTransactionV4.getById(txn.getId());
486 for (Key key : keys) {
487 v4txn.deferDelete(key);
489 return new FutureHelper.FakeFuture<Void>(null);
492 CommitRequest.Builder baseReq = CommitRequest.newBuilder();
493 baseReq.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
494 Iterator<CommitRequest.Builder> batches = deleteBatcher.getBatches(keys, baseReq,
495 baseReq.build().getSerializedSize(), true);
496 List<Future<CommitResponse>> futures = deleteBatcher.makeCalls(batches);
497 return new MultiFuture<CommitResponse, Void>(futures) {
498 @Override
499 public Void get() throws InterruptedException, ExecutionException {
500 for (Future<CommitResponse> future : futures) {
501 future.get();
503 return null;
506 @Override
507 public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
508 TimeoutException {
509 for (Future<CommitResponse> future : futures) {
510 future.get(timeout, unit);
512 return null;
518 * This API is specific to sequential IDs, which V4 does not support.
520 @Override
521 public Future<KeyRange> allocateIds(final Key parent, final String kind, long num) {
522 throw new UnsupportedOperationException();
526 * This API is specific to sequential IDs, which V4 does not support.
528 @Override
529 public Future<KeyRangeState> allocateIdRange(final KeyRange range) {
530 throw new UnsupportedOperationException();
534 * Allocates scattered IDs for a list of incomplete keys.
536 protected Future<List<EntityV4.Key>> allocateIds(List<Key> keyList) {
537 List<Integer> order = Lists.newArrayListWithCapacity(keyList.size());
538 Iterator<AllocateIdsRequest.Builder> batches = allocateIdsBatcher.getBatches(keyList,
539 AllocateIdsRequest.newBuilder(), 0, true, order);
540 List<Future<AllocateIdsResponse>> futures = allocateIdsBatcher.makeCalls(batches);
542 return new ReorderingMultiFuture<AllocateIdsResponse, List<EntityV4.Key>>(futures, order) {
543 @Override
544 protected List<EntityV4.Key> aggregate(AllocateIdsResponse batch, Iterator<Integer> indexItr,
545 List<EntityV4.Key> result) {
546 for (EntityV4.Key key : batch.getAllocatedList()) {
547 result.set(indexItr.next(), key);
549 return result;
552 @Override
553 protected List<com.google.apphosting.datastore.EntityV4.Key> initResult(int size) {
554 return Arrays.asList(new EntityV4.Key[size]);
559 @Override
560 public Future<Map<Index, IndexState>> getIndexes() {
561 throw new UnsupportedOperationException();
565 * Update a key object with the id in the proto, if one exists.
567 static void updateKey(EntityV4.Key v4Key, Key key) {
568 List<EntityV4.Key.PathElement> pathElements = v4Key.getPathElementList();
569 if (!pathElements.isEmpty()) {
570 PathElement lastElement = pathElements.get(pathElements.size() - 1);
571 if (lastElement.hasId()) {
572 key.setId(lastElement.getId());