Revision created by MOE tool push_codebase.
[gae.git] / java / src / main / com / google / appengine / api / datastore / AsyncDatastoreServiceImpl.java
blob007673469838f45f8615fc27980bc25777905ae3
1 // Copyright 2010 Google Inc. All rights reserved.
3 package com.google.appengine.api.datastore;
5 import static com.google.appengine.api.datastore.DatastoreApiHelper.makeAsyncCall;
6 import static com.google.appengine.api.datastore.DatastoreAttributes.DatastoreType.MASTER_SLAVE;
7 import static com.google.appengine.api.datastore.FetchOptions.Builder.withLimit;
8 import static com.google.appengine.api.datastore.ReadPolicy.Consistency.EVENTUAL;
10 import com.google.appengine.api.datastore.Batcher.ReorderingMultiFuture;
11 import com.google.appengine.api.datastore.DatastoreService.KeyRangeState;
12 import com.google.appengine.api.datastore.FutureHelper.MultiFuture;
13 import com.google.appengine.api.datastore.Index.IndexState;
14 import com.google.appengine.api.datastore.Query.FilterOperator;
15 import com.google.appengine.api.utils.FutureWrapper;
16 import com.google.apphosting.api.ApiBasePb.StringProto;
17 import com.google.apphosting.api.ApiProxy.ApiConfig;
18 import com.google.apphosting.datastore.DatastoreV3Pb;
19 import com.google.apphosting.datastore.DatastoreV3Pb.AllocateIdsRequest;
20 import com.google.apphosting.datastore.DatastoreV3Pb.AllocateIdsResponse;
21 import com.google.apphosting.datastore.DatastoreV3Pb.CompositeIndices;
22 import com.google.apphosting.datastore.DatastoreV3Pb.DatastoreService_3.Method;
23 import com.google.apphosting.datastore.DatastoreV3Pb.DeleteRequest;
24 import com.google.apphosting.datastore.DatastoreV3Pb.DeleteResponse;
25 import com.google.apphosting.datastore.DatastoreV3Pb.GetRequest;
26 import com.google.apphosting.datastore.DatastoreV3Pb.GetResponse;
27 import com.google.apphosting.datastore.DatastoreV3Pb.PutRequest;
28 import com.google.apphosting.datastore.DatastoreV3Pb.PutResponse;
29 import com.google.common.collect.Lists;
30 import com.google.common.collect.Maps;
31 import com.google.io.protocol.ProtocolMessage;
32 import com.google.storage.onestore.v3.OnestoreEntity.CompositeIndex;
33 import com.google.storage.onestore.v3.OnestoreEntity.EntityProto;
34 import com.google.storage.onestore.v3.OnestoreEntity.Reference;
36 import java.util.ArrayList;
37 import java.util.Collection;
38 import java.util.Collections;
39 import java.util.Iterator;
40 import java.util.LinkedHashMap;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.Set;
44 import java.util.concurrent.ExecutionException;
45 import java.util.concurrent.Future;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.TimeoutException;
48 import java.util.logging.Level;
50 /**
51 * An implementation of AsyncDatastoreService using the DatastoreV3 API.
54 class AsyncDatastoreServiceImpl extends BaseAsyncDatastoreServiceImpl {
56 /**
57 * A base batcher for DatastoreV3 operations executed in the context of an {@link
58 * AsyncDatastoreServiceImpl}.
59 * @param <S> the response message type
60 * @param <R> the request message type
61 * @param <F> the Java specific representation of a value
62 * @param <T> the proto representation of value
64 private abstract class V3Batcher<S extends ProtocolMessage<S>, R extends ProtocolMessage<R>,
65 F, T extends ProtocolMessage<T>> extends BaseRpcBatcher<S, R, F, T> {
66 @Override
67 final R newBatch(R baseBatch) {
68 return baseBatch.clone();
72 /**
73 * A base batcher for operations that operate on {@link Key}s.
74 * @param <S> the response message type
75 * @param <R> the request message type
77 private abstract class V3KeyBatcher<S extends ProtocolMessage<S>, R extends ProtocolMessage<R>>
78 extends V3Batcher<S, R, Key, Reference> {
79 @Override
80 final Object getGroup(Key value) {
81 return value.getRootKey();
84 @Override
85 final Reference toPb(Key value) {
86 return KeyTranslator.convertToPb(value);
90 private final V3KeyBatcher<DeleteResponse, DeleteRequest> deleteBatcher =
91 new V3KeyBatcher<DeleteResponse, DeleteRequest>() {
92 @Override
93 void addToBatch(Reference value, DeleteRequest batch) {
94 batch.addKey(value);
97 @Override
98 int getMaxCount() {
99 return datastoreServiceConfig.maxBatchWriteEntities;
102 @Override
103 protected Future<DeleteResponse> makeCall(DeleteRequest batch) {
104 return makeAsyncCall(apiConfig, Method.Delete, batch, new DeleteResponse());
108 private final V3KeyBatcher<GetResponse, GetRequest> getByKeyBatcher =
109 new V3KeyBatcher<GetResponse, GetRequest>() {
110 @Override
111 void addToBatch(Reference value, GetRequest batch) {
112 batch.addKey(value);
115 @Override
116 int getMaxCount() {
117 return datastoreServiceConfig.maxBatchReadEntities;
120 @Override
121 protected Future<GetResponse> makeCall(GetRequest batch) {
122 return makeAsyncCall(apiConfig, Method.Get, batch, new GetResponse());
126 private final V3Batcher<GetResponse, GetRequest, Reference, Reference> getByReferenceBatcher =
127 new V3Batcher<GetResponse, GetRequest, Reference, Reference>() {
128 @Override
129 final Object getGroup(Reference value) {
130 return value.getPath().getElement(0);
133 @Override
134 final Reference toPb(Reference value) {
135 return value;
138 @Override
139 void addToBatch(Reference value, GetRequest batch) {
140 batch.addKey(value);
143 @Override
144 int getMaxCount() {
145 return datastoreServiceConfig.maxBatchReadEntities;
148 @Override
149 protected Future<GetResponse> makeCall(GetRequest batch) {
150 return makeAsyncCall(apiConfig, Method.Get, batch, new GetResponse());
154 private final V3Batcher<PutResponse, PutRequest, Entity, EntityProto> putBatcher =
155 new V3Batcher<PutResponse, PutRequest, Entity, EntityProto>() {
156 @Override
157 Object getGroup(Entity value) {
158 return value.getKey().getRootKey();
161 @Override
162 void addToBatch(EntityProto value, PutRequest batch) {
163 batch.addEntity(value);
166 @Override
167 int getMaxCount() {
168 return datastoreServiceConfig.maxBatchWriteEntities;
171 @Override
172 protected Future<PutResponse> makeCall(PutRequest batch) {
173 return makeAsyncCall(apiConfig, Method.Put, batch, new PutResponse());
176 @Override
177 EntityProto toPb(Entity value) {
178 return EntityTranslator.convertToPb(value);
182 private final ApiConfig apiConfig;
184 public AsyncDatastoreServiceImpl(DatastoreServiceConfig datastoreServiceConfig,
185 ApiConfig apiConfig, TransactionStack defaultTxnProvider) {
186 super(datastoreServiceConfig, defaultTxnProvider,
187 new QueryRunnerV3(datastoreServiceConfig, apiConfig));
188 this.apiConfig = apiConfig;
191 @Override
192 protected TransactionImpl.InternalTransaction doBeginTransaction(TransactionOptions options) {
193 DatastoreV3Pb.Transaction remoteTxn = new DatastoreV3Pb.Transaction();
194 DatastoreV3Pb.BeginTransactionRequest request = new DatastoreV3Pb.BeginTransactionRequest();
195 request.setApp(datastoreServiceConfig.getAppIdNamespace().getAppId());
196 request.setAllowMultipleEg(options.isXG());
198 Future<DatastoreV3Pb.Transaction> future =
199 DatastoreApiHelper.makeAsyncCall(apiConfig, Method.BeginTransaction, request, remoteTxn);
201 return new InternalTransactionV3(apiConfig, request.getApp(), future);
204 @Override
205 protected final Future<Map<Key, Entity>> doBatchGet( Transaction txn, final Set<Key> keysToGet, final Map<Key, Entity> resultMap) {
206 final GetRequest baseReq = new GetRequest();
207 baseReq.setAllowDeferred(true);
208 if (txn != null) {
209 TransactionImpl.ensureTxnActive(txn);
210 baseReq.setTransaction(InternalTransactionV3.localTxnToRemoteTxn(txn));
212 if (datastoreServiceConfig.getReadPolicy().getConsistency() == EVENTUAL) {
213 baseReq.setFailoverMs(ARBITRARY_FAILOVER_READ_MS);
214 baseReq.setStrong(false);
217 final boolean shouldUseMultipleBatches = getDatastoreType() != MASTER_SLAVE && txn == null
218 && datastoreServiceConfig.getReadPolicy().getConsistency() != EVENTUAL;
220 Iterator<GetRequest> batches = getByKeyBatcher.getBatches(keysToGet, baseReq,
221 baseReq.getSerializedSize(), shouldUseMultipleBatches);
222 List<Future<GetResponse>> futures = getByKeyBatcher.makeCalls(batches);
224 return registerInTransaction(txn, new MultiFuture<GetResponse, Map<Key, Entity>>(futures) {
226 * A Map from a Reference without an App Id specified to the corresponding Key that the user
227 * requested. This is a workaround for the Remote API to support matching requested Keys to
228 * Entities that may be from a different App Id .
230 private Map<Reference, Key> keyMapIgnoringAppId;
232 @Override
233 public Map<Key, Entity> get() throws InterruptedException, ExecutionException {
234 try {
235 aggregate(futures, null, null);
236 } catch (TimeoutException e) {
237 throw new RuntimeException(e);
239 return resultMap;
242 @Override
243 public Map<Key, Entity> get(long timeout, TimeUnit unit)
244 throws InterruptedException, ExecutionException, TimeoutException {
245 aggregate(futures, timeout, unit);
246 return resultMap;
250 * Aggregates the results of the given Futures and issues (synchronous) followup requests if
251 * any results were deferred.
253 * @param currentFutures the Futures corresponding to the batches of the initial GetRequests.
254 * @param timeout the timeout to use while waiting on the Future, or null for none.
255 * @param timeoutUnit the unit of the timeout, or null for none.
257 private void aggregate(
258 Iterable<Future<GetResponse>> currentFutures, Long timeout, TimeUnit timeoutUnit)
259 throws ExecutionException, InterruptedException, TimeoutException {
260 while (true) {
261 List<Reference> deferredRefs = Lists.newLinkedList();
263 for (Future<GetResponse> currentFuture : currentFutures) {
264 GetResponse resp = getFutureWithOptionalTimeout(currentFuture, timeout, timeoutUnit);
265 addEntitiesToResultMap(resp);
266 deferredRefs.addAll(resp.deferreds());
269 if (deferredRefs.isEmpty()) {
270 break;
273 Iterator<GetRequest> followupBatches = getByReferenceBatcher.getBatches(deferredRefs,
274 baseReq, baseReq.getSerializedSize(), shouldUseMultipleBatches);
275 currentFutures = getByReferenceBatcher.makeCalls(followupBatches);
280 * Convenience method to get the result of a Future and optionally specify a timeout.
282 * @param future the Future to get.
283 * @param timeout the timeout to use while waiting on the Future, or null for none.
284 * @param timeoutUnit the unit of the timeout, or null for none.
285 * @return the result of the Future.
286 * @throws TimeoutException will only ever be thrown if a timeout is provided.
288 private GetResponse getFutureWithOptionalTimeout(
289 Future<GetResponse> future, Long timeout, TimeUnit timeoutUnit)
290 throws ExecutionException, InterruptedException, TimeoutException {
291 if (timeout == null) {
292 return future.get();
293 } else {
294 return future.get(timeout, timeoutUnit);
299 * Adds the Entities from the GetResponse to the resultMap. Will omit Keys that were missing.
300 * Handles Keys with different App Ids from the Entity.Key. See
301 * {@link #findKeyFromRequestIgnoringAppId(Reference)}
303 private void addEntitiesToResultMap(GetResponse response) {
304 for (GetResponse.Entity entityResult : response.entitys()) {
305 if (entityResult.hasEntity()) {
306 Entity responseEntity = EntityTranslator.createFromPb(entityResult.getEntity());
307 Key responseKey = responseEntity.getKey();
309 if (!keysToGet.contains(responseKey)) {
310 responseKey = findKeyFromRequestIgnoringAppId(entityResult.getEntity().getKey());
312 resultMap.put(responseKey, responseEntity);
318 * This is a hack to support calls going through the Remote API. The problem is:
320 * The requested Key may have a local app id.
321 * The returned Entity may have a remote app id.
323 * In this case, we want to return a Map.Entry with {LocalKey, RemoteEntity}. This way, the
324 * user can always do map.get(keyFromRequest).
326 * This method will find the corresponding requested LocalKey for a RemoteKey by ignoring the
327 * AppId field.
329 * Note that we used to be able to rely on the order of the Response Entities matching the
330 * order of Request Keys. We can no longer do so with the addition of Deferred results.
332 * @param referenceFromResponse the reference from the Response that did not match any of the
333 * requested Keys. (May be mutated.)
334 * @return the Key from the request that corresponds to the given Reference from the Response
335 * (ignoring AppId.)
337 private Key findKeyFromRequestIgnoringAppId(Reference referenceFromResponse) {
338 if (keyMapIgnoringAppId == null) {
339 keyMapIgnoringAppId = Maps.newHashMap();
340 for (Key requestKey : keysToGet) {
341 Reference requestKeyAsRefWithoutApp = KeyTranslator.convertToPb(requestKey).clearApp();
342 keyMapIgnoringAppId.put(requestKeyAsRefWithoutApp, requestKey);
346 Key result = keyMapIgnoringAppId.get(referenceFromResponse.clearApp());
347 if (result == null) {
348 throw new DatastoreFailureException("Internal error");
350 return result;
355 @Override
356 protected Future<List<Key>> doBatchPut( Transaction txn,
357 final List<Entity> entities) {
358 PutRequest baseReq = new PutRequest();
359 if (txn != null) {
360 TransactionImpl.ensureTxnActive(txn);
361 baseReq.setTransaction(InternalTransactionV3.localTxnToRemoteTxn(txn));
363 boolean group = !baseReq.hasTransaction();
364 List<Integer> order = Lists.newArrayListWithCapacity(entities.size());
365 Iterator<PutRequest> batches = putBatcher.getBatches(entities, baseReq,
366 baseReq.getSerializedSize(), group, order);
367 List<Future<PutResponse>> futures = putBatcher.makeCalls(batches);
369 return registerInTransaction(txn,
370 new ReorderingMultiFuture<PutResponse, List<Key>>(futures, order) {
371 @Override
372 protected List<Key> aggregate(
373 PutResponse intermediateResult, Iterator<Integer> indexItr, List<Key> result) {
374 for (Reference reference : intermediateResult.keys()) {
375 int index = indexItr.next();
376 Key key = entities.get(index).getKey();
377 KeyTranslator.updateKey(reference, key);
378 result.set(index, key);
380 return result;
383 @Override
384 protected List<Key> initResult(int size) {
385 List<Key> result = new ArrayList<Key>(Collections.<Key>nCopies(size, null));
386 return result;
391 @Override
392 protected Future<Void> doBatchDelete( Transaction txn, Collection<Key> keys) {
393 DeleteRequest baseReq = new DeleteRequest();
394 if (txn != null) {
395 TransactionImpl.ensureTxnActive(txn);
396 baseReq.setTransaction(InternalTransactionV3.localTxnToRemoteTxn(txn));
398 boolean group = !baseReq.hasTransaction();
399 Iterator<DeleteRequest> batches = deleteBatcher.getBatches(keys, baseReq,
400 baseReq.getSerializedSize(), group);
401 List<Future<DeleteResponse>> futures = deleteBatcher.makeCalls(batches);
402 return registerInTransaction(txn, new MultiFuture<DeleteResponse, Void>(futures) {
403 @Override
404 public Void get() throws InterruptedException, ExecutionException {
405 for (Future<DeleteResponse> future : futures) {
406 future.get();
408 return null;
411 @Override
412 public Void get(long timeout, TimeUnit unit)
413 throws InterruptedException, ExecutionException, TimeoutException {
414 for (Future<DeleteResponse> future : futures) {
415 future.get(timeout, unit);
417 return null;
422 static Reference buildAllocateIdsRef(
423 Key parent, String kind, AppIdNamespace appIdNamespace) {
424 if (parent != null && !parent.isComplete()) {
425 throw new IllegalArgumentException("parent key must be complete");
427 Key key = new Key(kind, parent, Key.NOT_ASSIGNED, "ignored", appIdNamespace);
428 return KeyTranslator.convertToPb(key);
431 @Override
432 public Future<KeyRange> allocateIds(final Key parent, final String kind, long num) {
433 if (num <= 0) {
434 throw new IllegalArgumentException("num must be > 0");
437 if (num > 1000000000) {
438 throw new IllegalArgumentException("num must be < 1 billion");
441 final AppIdNamespace appIdNamespace = datastoreServiceConfig.getAppIdNamespace();
442 Reference allocateIdsRef = buildAllocateIdsRef(parent, kind, appIdNamespace);
443 AllocateIdsRequest req =
444 new AllocateIdsRequest().setSize(num).setModelKey(allocateIdsRef);
445 AllocateIdsResponse resp = new AllocateIdsResponse();
446 Future<AllocateIdsResponse> future = makeAsyncCall(apiConfig, Method.AllocateIds, req, resp);
447 return new FutureWrapper<AllocateIdsResponse, KeyRange>(future) {
448 @Override
449 protected KeyRange wrap(AllocateIdsResponse resp) throws Exception {
450 return new KeyRange(parent, kind, resp.getStart(), resp.getEnd(), appIdNamespace);
453 @Override
454 protected Throwable convertException(Throwable cause) {
455 return cause;
460 @Override
461 public Future<KeyRangeState> allocateIdRange(final KeyRange range) {
462 Key parent = range.getParent();
463 final String kind = range.getKind();
464 final long start = range.getStart().getId();
465 long end = range.getEnd().getId();
467 AllocateIdsRequest req = new AllocateIdsRequest()
468 .setModelKey(AsyncDatastoreServiceImpl.buildAllocateIdsRef(parent, kind, null))
469 .setMax(end);
470 AllocateIdsResponse resp = new AllocateIdsResponse();
471 Future<AllocateIdsResponse> future = makeAsyncCall(apiConfig, Method.AllocateIds, req, resp);
472 return new FutureWrapper<AllocateIdsResponse, KeyRangeState>(future) {
473 @SuppressWarnings("deprecation")
474 @Override
475 protected KeyRangeState wrap(AllocateIdsResponse resp) throws Exception {
476 Query query = new Query(kind).setKeysOnly();
477 query.addFilter(
478 Entity.KEY_RESERVED_PROPERTY, FilterOperator.GREATER_THAN_OR_EQUAL, range.getStart());
479 query.addFilter(
480 Entity.KEY_RESERVED_PROPERTY, FilterOperator.LESS_THAN_OR_EQUAL, range.getEnd());
481 List<Entity> collision = prepare(query).asList(withLimit(1));
483 if (!collision.isEmpty()) {
484 return KeyRangeState.COLLISION;
487 boolean raceCondition = start < resp.getStart();
488 return raceCondition ? KeyRangeState.CONTENTION : KeyRangeState.EMPTY;
491 @Override
492 protected Throwable convertException(Throwable cause) {
493 return cause;
498 @Override
499 public Future<Map<Index, IndexState>> getIndexes() {
500 StringProto req = new StringProto();
501 req.setValue(datastoreServiceConfig.getAppIdNamespace().getAppId());
502 return new FutureWrapper<CompositeIndices, Map<Index, IndexState>>(makeAsyncCall(apiConfig,
503 Method.GetIndices, req, new CompositeIndices())) {
504 @Override
505 protected Map<Index, IndexState> wrap(CompositeIndices indices) throws Exception {
506 Map<Index, IndexState> answer = new LinkedHashMap<Index, IndexState>();
507 for (CompositeIndex ci : indices.indexs()) {
508 Index index = IndexTranslator.convertFromPb(ci);
509 switch (ci.getStateEnum()) {
510 case DELETED:
511 answer.put(index, IndexState.DELETING);
512 break;
513 case ERROR:
514 answer.put(index, IndexState.ERROR);
515 break;
516 case READ_WRITE:
517 answer.put(index, IndexState.SERVING);
518 break;
519 case WRITE_ONLY:
520 answer.put(index, IndexState.BUILDING);
521 break;
522 default:
523 logger.log(Level.WARNING, "Unrecognized index state for " + index);
524 break;
527 return answer;
530 @Override
531 protected Throwable convertException(Throwable cause) {
532 return cause;