1 // Copyright 2010 Google Inc. All Rights Reserved.
3 package com
.google
.appengine
.api
.datastore
;
5 import static com
.google
.appengine
.api
.datastore
.FutureHelper
.quietGet
;
6 import static com
.google
.common
.base
.Preconditions
.checkArgument
;
8 import com
.google
.appengine
.api
.datastore
.Batcher
.IndexedItem
;
9 import com
.google
.appengine
.api
.datastore
.DatastoreAttributes
.DatastoreType
;
10 import com
.google
.appengine
.api
.datastore
.EntityCachingStrategy
.NoOpEntityCachingStrategy
;
11 import com
.google
.appengine
.api
.datastore
.EntityCachingStrategy
.PreGetCachingResult
;
12 import com
.google
.appengine
.api
.datastore
.EntityCachingStrategy
.PreMutationCachingResult
;
13 import com
.google
.appengine
.api
.utils
.FutureWrapper
;
14 import com
.google
.apphosting
.api
.ApiProxy
;
15 import com
.google
.common
.collect
.ImmutableList
;
16 import com
.google
.common
.collect
.Lists
;
17 import com
.google
.common
.collect
.Sets
;
18 import com
.google
.protobuf
.Message
;
19 import com
.google
.protobuf
.MessageLite
;
20 import com
.google
.protobuf
.MessageLiteOrBuilder
;
22 import java
.util
.ArrayList
;
23 import java
.util
.Arrays
;
24 import java
.util
.Collection
;
25 import java
.util
.HashMap
;
26 import java
.util
.Iterator
;
27 import java
.util
.List
;
30 import java
.util
.concurrent
.Future
;
31 import java
.util
.logging
.Logger
;
34 * State and behavior that is common to all asynchronous Datastore API implementations.
37 abstract class BaseAsyncDatastoreServiceImpl
38 implements AsyncDatastoreServiceInternal
, CurrentTransactionProvider
{
40 * It doesn't actually matter what this value is, the back end will set its
41 * own deadline. All that matters is that we set a value.
43 static final long ARBITRARY_FAILOVER_READ_MS
= -1;
46 * User-provided config options.
48 final DatastoreServiceConfig datastoreServiceConfig
;
51 * Config that we'll pass to all api calls.
53 final ApiProxy
.ApiConfig apiConfig
;
56 * Knows which transaction to use when the user does not explicitly provide
59 final TransactionStack defaultTxnProvider
;
61 EntityCachingStrategy entityCachingStrategy
;
63 final Logger logger
= Logger
.getLogger(getClass().getName());
65 private DatastoreType datastoreType
;
67 private final QueryRunner queryRunner
;
70 * A base batcher for operations executed in the context of a {@link DatastoreService}.
71 * @param <S> the response message type
72 * @param <R> the request message type
73 * @param <F> the Java specific representation of a value
74 * @param <T> the proto representation of value
76 abstract class BaseRpcBatcher
<S
extends Message
, R
extends MessageLiteOrBuilder
, F
,
77 T
extends MessageLite
> extends Batcher
<R
, F
, T
> {
78 abstract Future
<S
> makeCall(R batch
);
81 final int getMaxSize() {
82 return datastoreServiceConfig
.maxRpcSizeBytes
;
86 final int getMaxGroups() {
87 return datastoreServiceConfig
.maxEntityGroupsPerRpc
;
90 final List
<Future
<S
>> makeCalls(Iterator
<R
> batches
) {
91 List
<Future
<S
>> futures
= new ArrayList
<Future
<S
>>();
92 while (batches
.hasNext()) {
93 futures
.add(makeCall(batches
.next()));
99 BaseAsyncDatastoreServiceImpl(DatastoreServiceConfig datastoreServiceConfig
,
100 TransactionStack defaultTxnProvider
, QueryRunner queryRunner
) {
101 this.datastoreServiceConfig
= datastoreServiceConfig
;
102 this.apiConfig
= createApiConfig(datastoreServiceConfig
);
103 this.defaultTxnProvider
= defaultTxnProvider
;
104 if (datastoreServiceConfig
.getEntityCacheConfig() == null) {
105 entityCachingStrategy
= NoOpEntityCachingStrategy
.INSTANCE
;
107 entityCachingStrategy
= EntityCachingStrategy
.createStrategy(datastoreServiceConfig
);
109 this.queryRunner
= queryRunner
;
112 protected abstract TransactionImpl
.InternalTransaction
113 doBeginTransaction(TransactionOptions options
);
115 protected abstract Future
<Map
<Key
, Entity
>> doBatchGet( Transaction txn
,
116 final Set
<Key
> keysToGet
, final Map
<Key
, Entity
> resultMap
);
118 protected abstract Future
<List
<Key
>> doBatchPut( Transaction txn
,
119 final List
<Entity
> entities
);
121 protected abstract Future
<Void
> doBatchDelete( Transaction txn
,
122 Collection
<Key
> keys
);
124 private ApiProxy
.ApiConfig
createApiConfig(DatastoreServiceConfig config
) {
125 ApiProxy
.ApiConfig apiConfig
= new ApiProxy
.ApiConfig();
126 apiConfig
.setDeadlineInSeconds(config
.getDeadline());
130 @SuppressWarnings("deprecation")
131 static void validateQuery(Query query
) {
132 checkArgument(query
.getFilterPredicates().isEmpty() || query
.getFilter() == null,
133 "A query cannot have both a filter and filter predicates set.");
134 checkArgument(query
.getProjections().isEmpty() || !query
.isKeysOnly(),
135 "A query cannot have both projections and keys-only set.");
139 * Return the current transaction if one already exists, otherwise create
140 * a new transaction or throw an exception according to the
141 * {@link ImplicitTransactionManagementPolicy}.
143 GetOrCreateTransactionResult
getOrCreateTransaction() {
144 Transaction currentTxn
= getCurrentTransaction(null);
145 if (currentTxn
!= null) {
146 return new GetOrCreateTransactionResult(false, currentTxn
);
149 switch(datastoreServiceConfig
.getImplicitTransactionManagementPolicy()) {
151 return new GetOrCreateTransactionResult(false, null);
153 return new GetOrCreateTransactionResult(true, createTransaction(
154 TransactionOptions
.Builder
.withDefaults(), false));
156 final String msg
= "Unexpected Transaction Creation Policy: "
157 + datastoreServiceConfig
.getImplicitTransactionManagementPolicy();
159 throw new IllegalArgumentException(msg
);
164 public Transaction
getCurrentTransaction() {
165 return defaultTxnProvider
.peek();
169 public Transaction
getCurrentTransaction(Transaction returnedIfNoTxn
) {
170 return defaultTxnProvider
.peek(returnedIfNoTxn
);
173 DatastoreServiceConfig
getDatastoreServiceConfig() {
174 return datastoreServiceConfig
;
178 public Future
<Entity
> get(Key key
) {
180 throw new NullPointerException("key cannot be null");
182 return wrapSingleGet(key
, get(Arrays
.asList(key
)));
186 public Future
<Entity
> get( Transaction txn
, final Key key
) {
188 throw new NullPointerException("key cannot be null");
190 return wrapSingleGet(key
, get(txn
, Arrays
.asList(key
)));
193 private Future
<Entity
> wrapSingleGet(final Key key
, Future
<Map
<Key
, Entity
>> futureEntities
) {
194 return new FutureWrapper
<Map
<Key
, Entity
>, Entity
>(futureEntities
) {
196 protected Entity
wrap(Map
<Key
, Entity
> entities
) throws Exception
{
197 Entity entity
= entities
.get(key
);
198 if (entity
== null) {
199 throw new EntityNotFoundException(key
);
205 protected Throwable
convertException(Throwable cause
) {
212 public Future
<Map
<Key
, Entity
>> get(final Iterable
<Key
> keys
) {
213 return new TransactionRunner
<Map
<Key
, Entity
>>(getOrCreateTransaction()) {
215 protected Future
<Map
<Key
, Entity
>> runInternal(Transaction txn
) {
216 return get(txn
, keys
);
218 }.runReadInTransaction();
222 public Future
<Map
<Key
, Entity
>> get(Transaction txn
, Iterable
<Key
> keys
) {
224 throw new NullPointerException("keys cannot be null");
227 List
<Key
> keyList
= Lists
.newArrayList(keys
);
229 Map
<Key
, Entity
> resultMap
= new HashMap
<Key
, Entity
>();
230 PreGetContext preGetContext
= new PreGetContext(this, keyList
, resultMap
);
231 datastoreServiceConfig
.getDatastoreCallbacks().executePreGetCallbacks(preGetContext
);
233 keyList
.removeAll(resultMap
.keySet());
235 PreGetCachingResult preGetCachingResult
=
236 entityCachingStrategy
.preGet(this, keyList
, resultMap
);
237 keyList
.removeAll(preGetCachingResult
.getKeysToSkipLoading());
239 Future
<Map
<Key
, Entity
>> result
= doBatchGet(txn
, Sets
.newLinkedHashSet(keyList
), resultMap
);
241 result
= entityCachingStrategy
.createPostGetFuture(result
, preGetCachingResult
);
242 return new PostLoadFuture(result
, datastoreServiceConfig
.getDatastoreCallbacks(), this);
246 public Future
<Key
> put(Entity entity
) {
247 return wrapSinglePut(put(Arrays
.asList(entity
)));
251 public Future
<Key
> put(Transaction txn
, Entity entity
) {
252 return wrapSinglePut(put(txn
, Arrays
.asList(entity
)));
255 private Future
<Key
> wrapSinglePut(Future
<List
<Key
>> futureKeys
) {
256 return new FutureWrapper
<List
<Key
>, Key
>(futureKeys
) {
258 protected Key
wrap(List
<Key
> keys
) throws Exception
{
263 protected Throwable
convertException(Throwable cause
) {
270 public Future
<List
<Key
>> put(final Iterable
<Entity
> entities
) {
271 return new TransactionRunner
<List
<Key
>>(getOrCreateTransaction()) {
273 protected Future
<List
<Key
>> runInternal(Transaction txn
) {
274 return put(txn
, entities
);
276 }.runWriteInTransaction();
280 public Future
<List
<Key
>> put( Transaction txn
, Iterable
<Entity
> entities
) {
281 List
<Entity
> entityList
= entities
instanceof List
282 ?
(List
<Entity
>) entities
: Lists
.newArrayList(entities
);
283 PutContext prePutContext
= new PutContext(this, entityList
);
284 datastoreServiceConfig
.getDatastoreCallbacks().executePrePutCallbacks(prePutContext
);
285 PreMutationCachingResult preMutationCachingResult
=
286 entityCachingStrategy
.prePut(this, entityList
);
288 List
<IndexedItem
<Key
>> indexedKeysToSkip
= Lists
.newArrayList();
289 Set
<Key
> mutationKeysToSkip
= preMutationCachingResult
.getMutationKeysToSkip();
290 List
<Entity
> entitiesToPut
;
291 if (!mutationKeysToSkip
.isEmpty()) {
292 entitiesToPut
= Lists
.newArrayList();
294 for (Entity entity
: entityList
) {
295 if (mutationKeysToSkip
.contains(entity
.getKey())) {
296 indexedKeysToSkip
.add(new IndexedItem
<Key
>(index
++, entity
.getKey()));
298 entitiesToPut
.add(entity
);
303 entitiesToPut
= ImmutableList
.copyOf(entities
);
306 Future
<List
<Key
>> result
= combinePutResult(doBatchPut(txn
, entitiesToPut
), indexedKeysToSkip
);
310 entityCachingStrategy
.createPostMutationFuture(result
, preMutationCachingResult
);
311 PutContext postPutContext
= new PutContext(this, entityList
);
312 result
= new PostPutFuture(result
, datastoreServiceConfig
.getDatastoreCallbacks(),
315 defaultTxnProvider
.addPutEntities(txn
, entityList
);
320 private Future
<List
<Key
>> combinePutResult(Future
<List
<Key
>> rpcResult
,
321 final List
<IndexedItem
<Key
>> skippedKeys
) {
322 if (skippedKeys
.isEmpty()) {
326 return new FutureWrapper
<List
<Key
>, List
<Key
>>(rpcResult
) {
328 protected List
<Key
> wrap(List
<Key
> result
) throws Exception
{
329 List
<Key
> combined
= Lists
.newLinkedList(result
);
330 for (IndexedItem
<Key
> indexedKey
: skippedKeys
) {
331 combined
.add(indexedKey
.index
, indexedKey
.item
);
337 protected Throwable
convertException(Throwable cause
) {
344 public Future
<Void
> delete(Key
... keys
) {
345 return delete(Arrays
.asList(keys
));
349 public Future
<Void
> delete(Transaction txn
, Key
... keys
) {
350 return delete(txn
, Arrays
.asList(keys
));
354 public Future
<Void
> delete(final Iterable
<Key
> keys
) {
355 return new TransactionRunner
<Void
>(getOrCreateTransaction()) {
357 protected Future
<Void
> runInternal(Transaction txn
) {
358 return delete(txn
, keys
);
360 }.runWriteInTransaction();
364 public Future
<Void
> delete(Transaction txn
, Iterable
<Key
> keys
) {
365 List
<Key
> allKeys
= keys
instanceof List
366 ?
(List
<Key
>) keys
: ImmutableList
.copyOf(keys
);
367 DeleteContext preDeleteContext
= new DeleteContext(this, allKeys
);
368 datastoreServiceConfig
.getDatastoreCallbacks().executePreDeleteCallbacks(preDeleteContext
);
369 PreMutationCachingResult preMutationCachingResult
=
370 entityCachingStrategy
.preDelete(this, allKeys
);
371 Future
<Void
> result
= null;
372 Collection
<Key
> keysToDelete
;
373 Set
<Key
> keysToSkip
= preMutationCachingResult
.getMutationKeysToSkip();
374 if (keysToSkip
.isEmpty()) {
375 keysToDelete
= allKeys
;
377 Set
<Key
> keySet
= Sets
.newHashSet(allKeys
);
378 keySet
.removeAll(keysToSkip
);
379 keysToDelete
= keySet
;
381 result
= doBatchDelete(txn
, keysToDelete
);
384 result
= entityCachingStrategy
.createPostMutationFuture(result
, preMutationCachingResult
);
385 result
= new PostDeleteFuture(
386 result
, datastoreServiceConfig
.getDatastoreCallbacks(),
387 new DeleteContext(this, allKeys
));
389 defaultTxnProvider
.addDeletedKeys(txn
, allKeys
);
395 public Collection
<Transaction
> getActiveTransactions() {
396 return defaultTxnProvider
.getAll();
400 * Register the provided future with the provided txn so that we know to
401 * perform a {@link java.util.concurrent.Future#get()} before the txn is
404 * @param txn The txn with which the future must be associated.
405 * @param future The future to associate with the txn.
406 * @param <T> The type of the Future
407 * @return The same future that was passed in, for caller convenience.
409 protected final <T
> Future
<T
> registerInTransaction( Transaction txn
,
412 defaultTxnProvider
.addFuture(txn
, future
);
413 return new FutureHelper
.TxnAwareFuture
<T
>(future
, txn
, defaultTxnProvider
);
419 public Future
<Transaction
> beginTransaction() {
420 return beginTransaction(TransactionOptions
.Builder
.withDefaults());
424 public Future
<Transaction
> beginTransaction(TransactionOptions options
) {
425 Transaction txn
= createTransaction(options
, true);
427 defaultTxnProvider
.push(txn
);
429 return new FutureHelper
.FakeFuture
<Transaction
>(txn
);
432 private Transaction
createTransaction(TransactionOptions options
, boolean isExplicit
) {
433 return new TransactionImpl(apiConfig
,
434 datastoreServiceConfig
.getAppIdNamespace().getAppId(), defaultTxnProvider
,
435 datastoreServiceConfig
.getDatastoreCallbacks(), entityCachingStrategy
, isExplicit
,
436 doBeginTransaction(options
));
440 public PreparedQuery
prepare(Query query
) {
441 return prepare(null, query
);
444 @SuppressWarnings("deprecation")
446 public PreparedQuery
prepare(Transaction txn
, Query query
) {
447 PreQueryContext context
= new PreQueryContext(this, query
);
448 datastoreServiceConfig
.getDatastoreCallbacks().executePreQueryCallbacks(context
);
450 query
= context
.getElements().get(0);
451 validateQuery(query
);
452 List
<MultiQueryBuilder
> queriesToRun
= QuerySplitHelper
.splitQuery(query
);
453 query
.setFilter(null);
454 query
.getFilterPredicates().clear();
455 if (queriesToRun
.size() == 1 && queriesToRun
.get(0).isSingleton()) {
456 query
.getFilterPredicates().addAll(queriesToRun
.get(0).getBaseFilters());
457 return new PreparedQueryImpl(apiConfig
, datastoreServiceConfig
, query
, txn
, queryRunner
);
459 return new PreparedMultiQuery(apiConfig
, datastoreServiceConfig
, query
, queriesToRun
, txn
,
464 public Future
<KeyRange
> allocateIds(String kind
, long num
) {
465 return allocateIds(null, kind
, num
);
468 protected DatastoreType
getDatastoreType() {
469 if (datastoreType
== null) {
470 datastoreType
= quietGet(getDatastoreAttributes()).getDatastoreType();
472 return datastoreType
;
476 public Future
<DatastoreAttributes
> getDatastoreAttributes() {
477 String appId
= datastoreServiceConfig
.getAppIdNamespace().getAppId();
478 DatastoreAttributes attributes
= new DatastoreAttributes(appId
);
479 return new FutureHelper
.FakeFuture
<DatastoreAttributes
>(attributes
);