Revision created by MOE tool push_codebase.
[gae.git] / java / src / main / com / google / appengine / api / datastore / BaseAsyncDatastoreServiceImpl.java
blob92576d8c60c0be4cd43fede3b80bb0747ea9cdf4
1 // Copyright 2010 Google Inc. All Rights Reserved.
3 package com.google.appengine.api.datastore;
5 import static com.google.appengine.api.datastore.FutureHelper.quietGet;
6 import static com.google.common.base.Preconditions.checkArgument;
8 import com.google.appengine.api.datastore.Batcher.IndexedItem;
9 import com.google.appengine.api.datastore.DatastoreAttributes.DatastoreType;
10 import com.google.appengine.api.datastore.EntityCachingStrategy.NoOpEntityCachingStrategy;
11 import com.google.appengine.api.datastore.EntityCachingStrategy.PreGetCachingResult;
12 import com.google.appengine.api.datastore.EntityCachingStrategy.PreMutationCachingResult;
13 import com.google.appengine.api.utils.FutureWrapper;
14 import com.google.common.collect.ImmutableList;
15 import com.google.common.collect.Lists;
16 import com.google.common.collect.Sets;
17 import com.google.protobuf.Message;
18 import com.google.protobuf.MessageLite;
19 import com.google.protobuf.MessageLiteOrBuilder;
21 import java.util.ArrayList;
22 import java.util.Arrays;
23 import java.util.Collection;
24 import java.util.HashMap;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Set;
29 import java.util.concurrent.Future;
30 import java.util.logging.Logger;
32 /**
33 * State and behavior that is common to all asynchronous Datastore API implementations.
36 abstract class BaseAsyncDatastoreServiceImpl
37 implements AsyncDatastoreServiceInternal, CurrentTransactionProvider {
38 /**
39 * It doesn't actually matter what this value is, the back end will set its
40 * own deadline. All that matters is that we set a value.
42 static final long ARBITRARY_FAILOVER_READ_MS = -1;
44 /**
45 * User-provided config options.
47 final DatastoreServiceConfig datastoreServiceConfig;
49 /**
50 * Knows which transaction to use when the user does not explicitly provide
51 * one.
53 final TransactionStack defaultTxnProvider;
55 EntityCachingStrategy entityCachingStrategy;
57 final Logger logger = Logger.getLogger(getClass().getName());
59 private DatastoreType datastoreType;
61 private final QueryRunner queryRunner;
63 /**
64 * A base batcher for operations executed in the context of a {@link DatastoreService}.
65 * @param <S> the response message type
66 * @param <R> the request message type
67 * @param <F> the Java specific representation of a value
68 * @param <T> the proto representation of value
70 abstract class BaseRpcBatcher<S extends Message, R extends MessageLiteOrBuilder, F,
71 T extends MessageLite> extends Batcher<R, F, T> {
72 abstract Future<S> makeCall(R batch);
74 @Override
75 final int getMaxSize() {
76 return datastoreServiceConfig.maxRpcSizeBytes;
79 @Override
80 final int getMaxGroups() {
81 return datastoreServiceConfig.maxEntityGroupsPerRpc;
84 final List<Future<S>> makeCalls(Iterator<R> batches) {
85 List<Future<S>> futures = new ArrayList<Future<S>>();
86 while (batches.hasNext()) {
87 futures.add(makeCall(batches.next()));
89 return futures;
93 BaseAsyncDatastoreServiceImpl(DatastoreServiceConfig datastoreServiceConfig,
94 TransactionStack defaultTxnProvider, QueryRunner queryRunner) {
95 this.datastoreServiceConfig = datastoreServiceConfig;
96 this.defaultTxnProvider = defaultTxnProvider;
97 this.queryRunner = queryRunner;
98 if (datastoreServiceConfig.getEntityCacheConfig() == null) {
99 entityCachingStrategy = NoOpEntityCachingStrategy.INSTANCE;
100 } else {
101 entityCachingStrategy = EntityCachingStrategy.createStrategy(datastoreServiceConfig);
105 protected abstract TransactionImpl.InternalTransaction
106 doBeginTransaction(TransactionOptions options);
108 protected abstract Future<Map<Key, Entity>> doBatchGet( Transaction txn,
109 final Set<Key> keysToGet, final Map<Key, Entity> resultMap);
111 protected abstract Future<List<Key>> doBatchPut( Transaction txn,
112 final List<Entity> entities);
114 protected abstract Future<Void> doBatchDelete( Transaction txn,
115 Collection<Key> keys);
117 @SuppressWarnings("deprecation")
118 static void validateQuery(Query query) {
119 checkArgument(query.getFilterPredicates().isEmpty() || query.getFilter() == null,
120 "A query cannot have both a filter and filter predicates set.");
121 checkArgument(query.getProjections().isEmpty() || !query.isKeysOnly(),
122 "A query cannot have both projections and keys-only set.");
126 * Return the current transaction if one already exists, otherwise create
127 * a new transaction or throw an exception according to the
128 * {@link ImplicitTransactionManagementPolicy}.
130 GetOrCreateTransactionResult getOrCreateTransaction() {
131 Transaction currentTxn = getCurrentTransaction(null);
132 if (currentTxn != null) {
133 return new GetOrCreateTransactionResult(false, currentTxn);
136 switch(datastoreServiceConfig.getImplicitTransactionManagementPolicy()) {
137 case NONE:
138 return new GetOrCreateTransactionResult(false, null);
139 case AUTO:
140 return new GetOrCreateTransactionResult(true, createTransaction(
141 TransactionOptions.Builder.withDefaults(), false));
142 default:
143 final String msg = "Unexpected Transaction Creation Policy: "
144 + datastoreServiceConfig.getImplicitTransactionManagementPolicy();
145 logger.severe(msg);
146 throw new IllegalArgumentException(msg);
150 @Override
151 public Transaction getCurrentTransaction() {
152 return defaultTxnProvider.peek();
155 @Override
156 public Transaction getCurrentTransaction(Transaction returnedIfNoTxn) {
157 return defaultTxnProvider.peek(returnedIfNoTxn);
160 DatastoreServiceConfig getDatastoreServiceConfig() {
161 return datastoreServiceConfig;
164 @Override
165 public Future<Entity> get(Key key) {
166 if (key == null) {
167 throw new NullPointerException("key cannot be null");
169 return wrapSingleGet(key, get(Arrays.asList(key)));
172 @Override
173 public Future<Entity> get( Transaction txn, final Key key) {
174 if (key == null) {
175 throw new NullPointerException("key cannot be null");
177 return wrapSingleGet(key, get(txn, Arrays.asList(key)));
180 private Future<Entity> wrapSingleGet(final Key key, Future<Map<Key, Entity>> futureEntities) {
181 return new FutureWrapper<Map<Key, Entity>, Entity>(futureEntities) {
182 @Override
183 protected Entity wrap(Map<Key, Entity> entities) throws Exception {
184 Entity entity = entities.get(key);
185 if (entity == null) {
186 throw new EntityNotFoundException(key);
188 return entity;
191 @Override
192 protected Throwable convertException(Throwable cause) {
193 return cause;
198 @Override
199 public Future<Map<Key, Entity>> get(final Iterable<Key> keys) {
200 return new TransactionRunner<Map<Key, Entity>>(getOrCreateTransaction()) {
201 @Override
202 protected Future<Map<Key, Entity>> runInternal(Transaction txn) {
203 return get(txn, keys);
205 }.runReadInTransaction();
208 @Override
209 public Future<Map<Key, Entity>> get(Transaction txn, Iterable<Key> keys) {
210 if (keys == null) {
211 throw new NullPointerException("keys cannot be null");
214 List<Key> keyList = Lists.newArrayList(keys);
216 Map<Key, Entity> resultMap = new HashMap<Key, Entity>();
217 PreGetContext preGetContext = new PreGetContext(this, keyList, resultMap);
218 datastoreServiceConfig.getDatastoreCallbacks().executePreGetCallbacks(preGetContext);
220 keyList.removeAll(resultMap.keySet());
222 PreGetCachingResult preGetCachingResult =
223 entityCachingStrategy.preGet(this, keyList, resultMap);
224 keyList.removeAll(preGetCachingResult.getKeysToSkipLoading());
226 Future<Map<Key, Entity>> result = doBatchGet(txn, Sets.newLinkedHashSet(keyList), resultMap);
228 result = entityCachingStrategy.createPostGetFuture(result, preGetCachingResult);
229 return new PostLoadFuture(result, datastoreServiceConfig.getDatastoreCallbacks(), this);
232 @Override
233 public Future<Key> put(Entity entity) {
234 return wrapSinglePut(put(Arrays.asList(entity)));
237 @Override
238 public Future<Key> put(Transaction txn, Entity entity) {
239 return wrapSinglePut(put(txn, Arrays.asList(entity)));
242 private Future<Key> wrapSinglePut(Future<List<Key>> futureKeys) {
243 return new FutureWrapper<List<Key>, Key>(futureKeys) {
244 @Override
245 protected Key wrap(List<Key> keys) throws Exception {
246 return keys.get(0);
249 @Override
250 protected Throwable convertException(Throwable cause) {
251 return cause;
256 @Override
257 public Future<List<Key>> put(final Iterable<Entity> entities) {
258 return new TransactionRunner<List<Key>>(getOrCreateTransaction()) {
259 @Override
260 protected Future<List<Key>> runInternal(Transaction txn) {
261 return put(txn, entities);
263 }.runWriteInTransaction();
266 @Override
267 public Future<List<Key>> put( Transaction txn, Iterable<Entity> entities) {
268 List<Entity> entityList = entities instanceof List
269 ? (List<Entity>) entities : Lists.newArrayList(entities);
270 PutContext prePutContext = new PutContext(this, entityList);
271 datastoreServiceConfig.getDatastoreCallbacks().executePrePutCallbacks(prePutContext);
272 PreMutationCachingResult preMutationCachingResult =
273 entityCachingStrategy.prePut(this, entityList);
275 List<IndexedItem<Key>> indexedKeysToSkip = Lists.newArrayList();
276 Set<Key> mutationKeysToSkip = preMutationCachingResult.getMutationKeysToSkip();
277 List<Entity> entitiesToPut;
278 if (!mutationKeysToSkip.isEmpty()) {
279 entitiesToPut = Lists.newArrayList();
280 int index = 0;
281 for (Entity entity : entityList) {
282 if (mutationKeysToSkip.contains(entity.getKey())) {
283 indexedKeysToSkip.add(new IndexedItem<Key>(index++, entity.getKey()));
284 } else {
285 entitiesToPut.add(entity);
286 ++index;
289 } else {
290 entitiesToPut = ImmutableList.copyOf(entities);
293 Future<List<Key>> result = combinePutResult(doBatchPut(txn, entitiesToPut), indexedKeysToSkip);
295 if (txn == null) {
296 result =
297 entityCachingStrategy.createPostMutationFuture(result, preMutationCachingResult);
298 PutContext postPutContext = new PutContext(this, entityList);
299 result = new PostPutFuture(result, datastoreServiceConfig.getDatastoreCallbacks(),
300 postPutContext);
301 } else {
302 defaultTxnProvider.addPutEntities(txn, entityList);
304 return result;
307 private Future<List<Key>> combinePutResult(Future<List<Key>> rpcResult,
308 final List<IndexedItem<Key>> skippedKeys) {
309 if (skippedKeys.isEmpty()) {
310 return rpcResult;
313 return new FutureWrapper<List<Key>, List<Key>>(rpcResult) {
314 @Override
315 protected List<Key> wrap(List<Key> result) throws Exception {
316 List<Key> combined = Lists.newLinkedList(result);
317 for (IndexedItem<Key> indexedKey : skippedKeys) {
318 combined.add(indexedKey.index, indexedKey.item);
320 return combined;
323 @Override
324 protected Throwable convertException(Throwable cause) {
325 return cause;
330 @Override
331 public Future<Void> delete(Key... keys) {
332 return delete(Arrays.asList(keys));
335 @Override
336 public Future<Void> delete(Transaction txn, Key... keys) {
337 return delete(txn, Arrays.asList(keys));
340 @Override
341 public Future<Void> delete(final Iterable<Key> keys) {
342 return new TransactionRunner<Void>(getOrCreateTransaction()) {
343 @Override
344 protected Future<Void> runInternal(Transaction txn) {
345 return delete(txn, keys);
347 }.runWriteInTransaction();
350 @Override
351 public Future<Void> delete(Transaction txn, Iterable<Key> keys) {
352 List<Key> allKeys = keys instanceof List
353 ? (List<Key>) keys : ImmutableList.copyOf(keys);
354 DeleteContext preDeleteContext = new DeleteContext(this, allKeys);
355 datastoreServiceConfig.getDatastoreCallbacks().executePreDeleteCallbacks(preDeleteContext);
356 PreMutationCachingResult preMutationCachingResult =
357 entityCachingStrategy.preDelete(this, allKeys);
358 Future<Void> result = null;
359 Collection<Key> keysToDelete;
360 Set<Key> keysToSkip = preMutationCachingResult.getMutationKeysToSkip();
361 if (keysToSkip.isEmpty()) {
362 keysToDelete = allKeys;
363 } else {
364 Set<Key> keySet = Sets.newHashSet(allKeys);
365 keySet.removeAll(keysToSkip);
366 keysToDelete = keySet;
368 result = doBatchDelete(txn, keysToDelete);
370 if (txn == null) {
371 result = entityCachingStrategy.createPostMutationFuture(result, preMutationCachingResult);
372 result = new PostDeleteFuture(
373 result, datastoreServiceConfig.getDatastoreCallbacks(),
374 new DeleteContext(this, allKeys));
375 } else {
376 defaultTxnProvider.addDeletedKeys(txn, allKeys);
378 return result;
381 @Override
382 public Collection<Transaction> getActiveTransactions() {
383 return defaultTxnProvider.getAll();
387 * Register the provided future with the provided txn so that we know to
388 * perform a {@link java.util.concurrent.Future#get()} before the txn is
389 * committed.
391 * @param txn The txn with which the future must be associated.
392 * @param future The future to associate with the txn.
393 * @param <T> The type of the Future
394 * @return The same future that was passed in, for caller convenience.
396 protected final <T> Future<T> registerInTransaction( Transaction txn,
397 Future<T> future) {
398 if (txn != null) {
399 defaultTxnProvider.addFuture(txn, future);
400 return new FutureHelper.TxnAwareFuture<T>(future, txn, defaultTxnProvider);
402 return future;
405 @Override
406 public Future<Transaction> beginTransaction() {
407 return beginTransaction(TransactionOptions.Builder.withDefaults());
410 @Override
411 public Future<Transaction> beginTransaction(TransactionOptions options) {
412 Transaction txn = createTransaction(options, true);
414 defaultTxnProvider.push(txn);
416 return new FutureHelper.FakeFuture<Transaction>(txn);
419 private Transaction createTransaction(TransactionOptions options, boolean isExplicit) {
420 return new TransactionImpl(
421 datastoreServiceConfig.getAppIdNamespace().getAppId(), defaultTxnProvider,
422 datastoreServiceConfig.getDatastoreCallbacks(), entityCachingStrategy, isExplicit,
423 doBeginTransaction(options));
426 @Override
427 public PreparedQuery prepare(Query query) {
428 return prepare(null, query);
431 @SuppressWarnings("deprecation")
432 @Override
433 public PreparedQuery prepare(Transaction txn, Query query) {
434 PreQueryContext context = new PreQueryContext(this, query);
435 datastoreServiceConfig.getDatastoreCallbacks().executePreQueryCallbacks(context);
437 query = context.getElements().get(0);
438 validateQuery(query);
439 if (isGeoQuery(query)) {
440 return new PreparedQueryImpl(query, txn, queryRunner);
442 List<MultiQueryBuilder> queriesToRun = QuerySplitHelper.splitQuery(query);
443 query.setFilter(null);
444 query.getFilterPredicates().clear();
445 if (queriesToRun.size() == 1 && queriesToRun.get(0).isSingleton()) {
446 query.getFilterPredicates().addAll(queriesToRun.get(0).getBaseFilters());
447 return new PreparedQueryImpl(query, txn, queryRunner);
449 return new PreparedMultiQuery(query, queriesToRun, txn, queryRunner);
452 /** Determines whether the query has a geo-spatial filter. */
453 private boolean isGeoQuery(Query query) {
454 Query.Filter filter = query.getFilter();
455 if (filter == null) {
456 return false;
458 return isGeoFilter(filter);
462 * Walks the filter tree searching for a geo-spatial component.
464 * @return true, if we find an StContainsFilter, without thoroughly
465 * validating that the rest of the tree is compatible with the geo-filter.
467 private boolean isGeoFilter(Query.Filter filter) {
468 if (filter instanceof Query.StContainsFilter) {
469 return true;
471 if (filter instanceof Query.CompositeFilter) {
472 for (Query.Filter f : ((Query.CompositeFilter) filter).getSubFilters()) {
473 if (isGeoFilter(f)) {
474 return true;
478 return false;
481 @Override
482 public Future<KeyRange> allocateIds(String kind, long num) {
483 return allocateIds(null, kind, num);
486 protected DatastoreType getDatastoreType() {
487 if (datastoreType == null) {
488 datastoreType = quietGet(getDatastoreAttributes()).getDatastoreType();
490 return datastoreType;
493 @Override
494 public Future<DatastoreAttributes> getDatastoreAttributes() {
495 String appId = datastoreServiceConfig.getAppIdNamespace().getAppId();
496 DatastoreAttributes attributes = new DatastoreAttributes(appId);
497 return new FutureHelper.FakeFuture<DatastoreAttributes>(attributes);