App Engine Java SDK version 1.9.8
[gae.git] / java / src / main / com / google / appengine / api / datastore / BaseAsyncDatastoreServiceImpl.java
blobb14b161c47b3b236bbb8493be7473275cfc20f4e
1 // Copyright 2010 Google Inc. All Rights Reserved.
3 package com.google.appengine.api.datastore;
5 import static com.google.appengine.api.datastore.FutureHelper.quietGet;
6 import static com.google.common.base.Preconditions.checkArgument;
8 import com.google.appengine.api.datastore.Batcher.IndexedItem;
9 import com.google.appengine.api.datastore.DatastoreAttributes.DatastoreType;
10 import com.google.appengine.api.datastore.EntityCachingStrategy.NoOpEntityCachingStrategy;
11 import com.google.appengine.api.datastore.EntityCachingStrategy.PreGetCachingResult;
12 import com.google.appengine.api.datastore.EntityCachingStrategy.PreMutationCachingResult;
13 import com.google.appengine.api.utils.FutureWrapper;
14 import com.google.apphosting.api.ApiProxy;
15 import com.google.common.collect.ImmutableList;
16 import com.google.common.collect.Lists;
17 import com.google.common.collect.Sets;
18 import com.google.protobuf.Message;
19 import com.google.protobuf.MessageLite;
20 import com.google.protobuf.MessageLiteOrBuilder;
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.Collection;
25 import java.util.HashMap;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Set;
30 import java.util.concurrent.Future;
31 import java.util.logging.Logger;
33 /**
34 * State and behavior that is common to all asynchronous Datastore API implementations.
37 abstract class BaseAsyncDatastoreServiceImpl
38 implements AsyncDatastoreServiceInternal, CurrentTransactionProvider {
39 /**
40 * It doesn't actually matter what this value is, the back end will set its
41 * own deadline. All that matters is that we set a value.
43 static final long ARBITRARY_FAILOVER_READ_MS = -1;
45 /**
46 * User-provided config options.
48 final DatastoreServiceConfig datastoreServiceConfig;
50 /**
51 * Config that we'll pass to all api calls.
53 final ApiProxy.ApiConfig apiConfig;
55 /**
56 * Knows which transaction to use when the user does not explicitly provide
57 * one.
59 final TransactionStack defaultTxnProvider;
61 EntityCachingStrategy entityCachingStrategy;
63 final Logger logger = Logger.getLogger(getClass().getName());
65 private DatastoreType datastoreType;
67 private final QueryRunner queryRunner;
69 /**
70 * A base batcher for operations executed in the context of a {@link DatastoreService}.
71 * @param <S> the response message type
72 * @param <R> the request message type
73 * @param <F> the Java specific representation of a value
74 * @param <T> the proto representation of value
76 abstract class BaseRpcBatcher<S extends Message, R extends MessageLiteOrBuilder, F,
77 T extends MessageLite> extends Batcher<R, F, T> {
78 abstract Future<S> makeCall(R batch);
80 @Override
81 final int getMaxSize() {
82 return datastoreServiceConfig.maxRpcSizeBytes;
85 @Override
86 final int getMaxGroups() {
87 return datastoreServiceConfig.maxEntityGroupsPerRpc;
90 final List<Future<S>> makeCalls(Iterator<R> batches) {
91 List<Future<S>> futures = new ArrayList<Future<S>>();
92 while (batches.hasNext()) {
93 futures.add(makeCall(batches.next()));
95 return futures;
99 BaseAsyncDatastoreServiceImpl(DatastoreServiceConfig datastoreServiceConfig,
100 TransactionStack defaultTxnProvider, QueryRunner queryRunner) {
101 this.datastoreServiceConfig = datastoreServiceConfig;
102 this.apiConfig = createApiConfig(datastoreServiceConfig);
103 this.defaultTxnProvider = defaultTxnProvider;
104 if (datastoreServiceConfig.getEntityCacheConfig() == null) {
105 entityCachingStrategy = NoOpEntityCachingStrategy.INSTANCE;
106 } else {
107 entityCachingStrategy = EntityCachingStrategy.createStrategy(datastoreServiceConfig);
109 this.queryRunner = queryRunner;
112 protected abstract TransactionImpl.InternalTransaction
113 doBeginTransaction(TransactionOptions options);
115 protected abstract Future<Map<Key, Entity>> doBatchGet( Transaction txn,
116 final Set<Key> keysToGet, final Map<Key, Entity> resultMap);
118 protected abstract Future<List<Key>> doBatchPut( Transaction txn,
119 final List<Entity> entities);
121 protected abstract Future<Void> doBatchDelete( Transaction txn,
122 Collection<Key> keys);
124 private ApiProxy.ApiConfig createApiConfig(DatastoreServiceConfig config) {
125 ApiProxy.ApiConfig apiConfig = new ApiProxy.ApiConfig();
126 apiConfig.setDeadlineInSeconds(config.getDeadline());
127 return apiConfig;
130 @SuppressWarnings("deprecation")
131 static void validateQuery(Query query) {
132 checkArgument(query.getFilterPredicates().isEmpty() || query.getFilter() == null,
133 "A query cannot have both a filter and filter predicates set.");
134 checkArgument(query.getProjections().isEmpty() || !query.isKeysOnly(),
135 "A query cannot have both projections and keys-only set.");
139 * Return the current transaction if one already exists, otherwise create
140 * a new transaction or throw an exception according to the
141 * {@link ImplicitTransactionManagementPolicy}.
143 GetOrCreateTransactionResult getOrCreateTransaction() {
144 Transaction currentTxn = getCurrentTransaction(null);
145 if (currentTxn != null) {
146 return new GetOrCreateTransactionResult(false, currentTxn);
149 switch(datastoreServiceConfig.getImplicitTransactionManagementPolicy()) {
150 case NONE:
151 return new GetOrCreateTransactionResult(false, null);
152 case AUTO:
153 return new GetOrCreateTransactionResult(true, createTransaction(
154 TransactionOptions.Builder.withDefaults(), false));
155 default:
156 final String msg = "Unexpected Transaction Creation Policy: "
157 + datastoreServiceConfig.getImplicitTransactionManagementPolicy();
158 logger.severe(msg);
159 throw new IllegalArgumentException(msg);
163 @Override
164 public Transaction getCurrentTransaction() {
165 return defaultTxnProvider.peek();
168 @Override
169 public Transaction getCurrentTransaction(Transaction returnedIfNoTxn) {
170 return defaultTxnProvider.peek(returnedIfNoTxn);
173 DatastoreServiceConfig getDatastoreServiceConfig() {
174 return datastoreServiceConfig;
177 @Override
178 public Future<Entity> get(Key key) {
179 if (key == null) {
180 throw new NullPointerException("key cannot be null");
182 return wrapSingleGet(key, get(Arrays.asList(key)));
185 @Override
186 public Future<Entity> get( Transaction txn, final Key key) {
187 if (key == null) {
188 throw new NullPointerException("key cannot be null");
190 return wrapSingleGet(key, get(txn, Arrays.asList(key)));
193 private Future<Entity> wrapSingleGet(final Key key, Future<Map<Key, Entity>> futureEntities) {
194 return new FutureWrapper<Map<Key, Entity>, Entity>(futureEntities) {
195 @Override
196 protected Entity wrap(Map<Key, Entity> entities) throws Exception {
197 Entity entity = entities.get(key);
198 if (entity == null) {
199 throw new EntityNotFoundException(key);
201 return entity;
204 @Override
205 protected Throwable convertException(Throwable cause) {
206 return cause;
211 @Override
212 public Future<Map<Key, Entity>> get(final Iterable<Key> keys) {
213 return new TransactionRunner<Map<Key, Entity>>(getOrCreateTransaction()) {
214 @Override
215 protected Future<Map<Key, Entity>> runInternal(Transaction txn) {
216 return get(txn, keys);
218 }.runReadInTransaction();
221 @Override
222 public Future<Map<Key, Entity>> get(Transaction txn, Iterable<Key> keys) {
223 if (keys == null) {
224 throw new NullPointerException("keys cannot be null");
227 List<Key> keyList = Lists.newArrayList(keys);
229 Map<Key, Entity> resultMap = new HashMap<Key, Entity>();
230 PreGetContext preGetContext = new PreGetContext(this, keyList, resultMap);
231 datastoreServiceConfig.getDatastoreCallbacks().executePreGetCallbacks(preGetContext);
233 keyList.removeAll(resultMap.keySet());
235 PreGetCachingResult preGetCachingResult =
236 entityCachingStrategy.preGet(this, keyList, resultMap);
237 keyList.removeAll(preGetCachingResult.getKeysToSkipLoading());
239 Future<Map<Key, Entity>> result = doBatchGet(txn, Sets.newLinkedHashSet(keyList), resultMap);
241 result = entityCachingStrategy.createPostGetFuture(result, preGetCachingResult);
242 return new PostLoadFuture(result, datastoreServiceConfig.getDatastoreCallbacks(), this);
245 @Override
246 public Future<Key> put(Entity entity) {
247 return wrapSinglePut(put(Arrays.asList(entity)));
250 @Override
251 public Future<Key> put(Transaction txn, Entity entity) {
252 return wrapSinglePut(put(txn, Arrays.asList(entity)));
255 private Future<Key> wrapSinglePut(Future<List<Key>> futureKeys) {
256 return new FutureWrapper<List<Key>, Key>(futureKeys) {
257 @Override
258 protected Key wrap(List<Key> keys) throws Exception {
259 return keys.get(0);
262 @Override
263 protected Throwable convertException(Throwable cause) {
264 return cause;
269 @Override
270 public Future<List<Key>> put(final Iterable<Entity> entities) {
271 return new TransactionRunner<List<Key>>(getOrCreateTransaction()) {
272 @Override
273 protected Future<List<Key>> runInternal(Transaction txn) {
274 return put(txn, entities);
276 }.runWriteInTransaction();
279 @Override
280 public Future<List<Key>> put( Transaction txn, Iterable<Entity> entities) {
281 List<Entity> entityList = entities instanceof List
282 ? (List<Entity>) entities : Lists.newArrayList(entities);
283 PutContext prePutContext = new PutContext(this, entityList);
284 datastoreServiceConfig.getDatastoreCallbacks().executePrePutCallbacks(prePutContext);
285 PreMutationCachingResult preMutationCachingResult =
286 entityCachingStrategy.prePut(this, entityList);
288 List<IndexedItem<Key>> indexedKeysToSkip = Lists.newArrayList();
289 Set<Key> mutationKeysToSkip = preMutationCachingResult.getMutationKeysToSkip();
290 List<Entity> entitiesToPut;
291 if (!mutationKeysToSkip.isEmpty()) {
292 entitiesToPut = Lists.newArrayList();
293 int index = 0;
294 for (Entity entity : entityList) {
295 if (mutationKeysToSkip.contains(entity.getKey())) {
296 indexedKeysToSkip.add(new IndexedItem<Key>(index++, entity.getKey()));
297 } else {
298 entitiesToPut.add(entity);
299 ++index;
302 } else {
303 entitiesToPut = ImmutableList.copyOf(entities);
306 Future<List<Key>> result = combinePutResult(doBatchPut(txn, entitiesToPut), indexedKeysToSkip);
308 if (txn == null) {
309 result =
310 entityCachingStrategy.createPostMutationFuture(result, preMutationCachingResult);
311 PutContext postPutContext = new PutContext(this, entityList);
312 result = new PostPutFuture(result, datastoreServiceConfig.getDatastoreCallbacks(),
313 postPutContext);
314 } else {
315 defaultTxnProvider.addPutEntities(txn, entityList);
317 return result;
320 private Future<List<Key>> combinePutResult(Future<List<Key>> rpcResult,
321 final List<IndexedItem<Key>> skippedKeys) {
322 if (skippedKeys.isEmpty()) {
323 return rpcResult;
326 return new FutureWrapper<List<Key>, List<Key>>(rpcResult) {
327 @Override
328 protected List<Key> wrap(List<Key> result) throws Exception {
329 List<Key> combined = Lists.newLinkedList(result);
330 for (IndexedItem<Key> indexedKey : skippedKeys) {
331 combined.add(indexedKey.index, indexedKey.item);
333 return combined;
336 @Override
337 protected Throwable convertException(Throwable cause) {
338 return cause;
343 @Override
344 public Future<Void> delete(Key... keys) {
345 return delete(Arrays.asList(keys));
348 @Override
349 public Future<Void> delete(Transaction txn, Key... keys) {
350 return delete(txn, Arrays.asList(keys));
353 @Override
354 public Future<Void> delete(final Iterable<Key> keys) {
355 return new TransactionRunner<Void>(getOrCreateTransaction()) {
356 @Override
357 protected Future<Void> runInternal(Transaction txn) {
358 return delete(txn, keys);
360 }.runWriteInTransaction();
363 @Override
364 public Future<Void> delete(Transaction txn, Iterable<Key> keys) {
365 List<Key> allKeys = keys instanceof List
366 ? (List<Key>) keys : ImmutableList.copyOf(keys);
367 DeleteContext preDeleteContext = new DeleteContext(this, allKeys);
368 datastoreServiceConfig.getDatastoreCallbacks().executePreDeleteCallbacks(preDeleteContext);
369 PreMutationCachingResult preMutationCachingResult =
370 entityCachingStrategy.preDelete(this, allKeys);
371 Future<Void> result = null;
372 Collection<Key> keysToDelete;
373 Set<Key> keysToSkip = preMutationCachingResult.getMutationKeysToSkip();
374 if (keysToSkip.isEmpty()) {
375 keysToDelete = allKeys;
376 } else {
377 Set<Key> keySet = Sets.newHashSet(allKeys);
378 keySet.removeAll(keysToSkip);
379 keysToDelete = keySet;
381 result = doBatchDelete(txn, keysToDelete);
383 if (txn == null) {
384 result = entityCachingStrategy.createPostMutationFuture(result, preMutationCachingResult);
385 result = new PostDeleteFuture(
386 result, datastoreServiceConfig.getDatastoreCallbacks(),
387 new DeleteContext(this, allKeys));
388 } else {
389 defaultTxnProvider.addDeletedKeys(txn, allKeys);
391 return result;
394 @Override
395 public Collection<Transaction> getActiveTransactions() {
396 return defaultTxnProvider.getAll();
400 * Register the provided future with the provided txn so that we know to
401 * perform a {@link java.util.concurrent.Future#get()} before the txn is
402 * committed.
404 * @param txn The txn with which the future must be associated.
405 * @param future The future to associate with the txn.
406 * @param <T> The type of the Future
407 * @return The same future that was passed in, for caller convenience.
409 protected final <T> Future<T> registerInTransaction( Transaction txn,
410 Future<T> future) {
411 if (txn != null) {
412 defaultTxnProvider.addFuture(txn, future);
413 return new FutureHelper.TxnAwareFuture<T>(future, txn, defaultTxnProvider);
415 return future;
418 @Override
419 public Future<Transaction> beginTransaction() {
420 return beginTransaction(TransactionOptions.Builder.withDefaults());
423 @Override
424 public Future<Transaction> beginTransaction(TransactionOptions options) {
425 Transaction txn = createTransaction(options, true);
427 defaultTxnProvider.push(txn);
429 return new FutureHelper.FakeFuture<Transaction>(txn);
432 private Transaction createTransaction(TransactionOptions options, boolean isExplicit) {
433 return new TransactionImpl(apiConfig,
434 datastoreServiceConfig.getAppIdNamespace().getAppId(), defaultTxnProvider,
435 datastoreServiceConfig.getDatastoreCallbacks(), entityCachingStrategy, isExplicit,
436 doBeginTransaction(options));
439 @Override
440 public PreparedQuery prepare(Query query) {
441 return prepare(null, query);
444 @SuppressWarnings("deprecation")
445 @Override
446 public PreparedQuery prepare(Transaction txn, Query query) {
447 PreQueryContext context = new PreQueryContext(this, query);
448 datastoreServiceConfig.getDatastoreCallbacks().executePreQueryCallbacks(context);
450 query = context.getElements().get(0);
451 validateQuery(query);
452 List<MultiQueryBuilder> queriesToRun = QuerySplitHelper.splitQuery(query);
453 query.setFilter(null);
454 query.getFilterPredicates().clear();
455 if (queriesToRun.size() == 1 && queriesToRun.get(0).isSingleton()) {
456 query.getFilterPredicates().addAll(queriesToRun.get(0).getBaseFilters());
457 return new PreparedQueryImpl(apiConfig, datastoreServiceConfig, query, txn, queryRunner);
459 return new PreparedMultiQuery(apiConfig, datastoreServiceConfig, query, queriesToRun, txn,
460 queryRunner);
463 @Override
464 public Future<KeyRange> allocateIds(String kind, long num) {
465 return allocateIds(null, kind, num);
468 protected DatastoreType getDatastoreType() {
469 if (datastoreType == null) {
470 datastoreType = quietGet(getDatastoreAttributes()).getDatastoreType();
472 return datastoreType;
475 @Override
476 public Future<DatastoreAttributes> getDatastoreAttributes() {
477 String appId = datastoreServiceConfig.getAppIdNamespace().getAppId();
478 DatastoreAttributes attributes = new DatastoreAttributes(appId);
479 return new FutureHelper.FakeFuture<DatastoreAttributes>(attributes);