1 // Copyright 2010 Google Inc. All Rights Reserved.
3 package com
.google
.appengine
.api
.datastore
;
5 import static com
.google
.appengine
.api
.datastore
.FutureHelper
.quietGet
;
6 import static com
.google
.common
.base
.Preconditions
.checkArgument
;
8 import com
.google
.appengine
.api
.datastore
.Batcher
.IndexedItem
;
9 import com
.google
.appengine
.api
.datastore
.DatastoreAttributes
.DatastoreType
;
10 import com
.google
.appengine
.api
.datastore
.EntityCachingStrategy
.NoOpEntityCachingStrategy
;
11 import com
.google
.appengine
.api
.datastore
.EntityCachingStrategy
.PreGetCachingResult
;
12 import com
.google
.appengine
.api
.datastore
.EntityCachingStrategy
.PreMutationCachingResult
;
13 import com
.google
.appengine
.api
.utils
.FutureWrapper
;
14 import com
.google
.common
.collect
.ImmutableList
;
15 import com
.google
.common
.collect
.Lists
;
16 import com
.google
.common
.collect
.Sets
;
17 import com
.google
.protobuf
.Message
;
18 import com
.google
.protobuf
.MessageLite
;
19 import com
.google
.protobuf
.MessageLiteOrBuilder
;
21 import java
.util
.ArrayList
;
22 import java
.util
.Arrays
;
23 import java
.util
.Collection
;
24 import java
.util
.HashMap
;
25 import java
.util
.Iterator
;
26 import java
.util
.List
;
29 import java
.util
.concurrent
.Future
;
30 import java
.util
.logging
.Logger
;
33 * State and behavior that is common to all asynchronous Datastore API implementations.
36 abstract class BaseAsyncDatastoreServiceImpl
37 implements AsyncDatastoreServiceInternal
, CurrentTransactionProvider
{
39 * It doesn't actually matter what this value is, the back end will set its
40 * own deadline. All that matters is that we set a value.
42 static final long ARBITRARY_FAILOVER_READ_MS
= -1;
45 * User-provided config options.
47 final DatastoreServiceConfig datastoreServiceConfig
;
50 * Knows which transaction to use when the user does not explicitly provide
53 final TransactionStack defaultTxnProvider
;
55 EntityCachingStrategy entityCachingStrategy
;
57 final Logger logger
= Logger
.getLogger(getClass().getName());
59 private DatastoreType datastoreType
;
61 private final QueryRunner queryRunner
;
64 * A base batcher for operations executed in the context of a {@link DatastoreService}.
65 * @param <S> the response message type
66 * @param <R> the request message type
67 * @param <F> the Java specific representation of a value
68 * @param <T> the proto representation of value
70 abstract class BaseRpcBatcher
<S
extends Message
, R
extends MessageLiteOrBuilder
, F
,
71 T
extends MessageLite
> extends Batcher
<R
, F
, T
> {
72 abstract Future
<S
> makeCall(R batch
);
75 final int getMaxSize() {
76 return datastoreServiceConfig
.maxRpcSizeBytes
;
80 final int getMaxGroups() {
81 return datastoreServiceConfig
.maxEntityGroupsPerRpc
;
84 final List
<Future
<S
>> makeCalls(Iterator
<R
> batches
) {
85 List
<Future
<S
>> futures
= new ArrayList
<Future
<S
>>();
86 while (batches
.hasNext()) {
87 futures
.add(makeCall(batches
.next()));
93 BaseAsyncDatastoreServiceImpl(DatastoreServiceConfig datastoreServiceConfig
,
94 TransactionStack defaultTxnProvider
, QueryRunner queryRunner
) {
95 this.datastoreServiceConfig
= datastoreServiceConfig
;
96 this.defaultTxnProvider
= defaultTxnProvider
;
97 this.queryRunner
= queryRunner
;
98 if (datastoreServiceConfig
.getEntityCacheConfig() == null) {
99 entityCachingStrategy
= NoOpEntityCachingStrategy
.INSTANCE
;
101 entityCachingStrategy
= EntityCachingStrategy
.createStrategy(datastoreServiceConfig
);
105 protected abstract TransactionImpl
.InternalTransaction
106 doBeginTransaction(TransactionOptions options
);
108 protected abstract Future
<Map
<Key
, Entity
>> doBatchGet( Transaction txn
,
109 final Set
<Key
> keysToGet
, final Map
<Key
, Entity
> resultMap
);
111 protected abstract Future
<List
<Key
>> doBatchPut( Transaction txn
,
112 final List
<Entity
> entities
);
114 protected abstract Future
<Void
> doBatchDelete( Transaction txn
,
115 Collection
<Key
> keys
);
117 @SuppressWarnings("deprecation")
118 static void validateQuery(Query query
) {
119 checkArgument(query
.getFilterPredicates().isEmpty() || query
.getFilter() == null,
120 "A query cannot have both a filter and filter predicates set.");
121 checkArgument(query
.getProjections().isEmpty() || !query
.isKeysOnly(),
122 "A query cannot have both projections and keys-only set.");
126 * Return the current transaction if one already exists, otherwise create
127 * a new transaction or throw an exception according to the
128 * {@link ImplicitTransactionManagementPolicy}.
130 GetOrCreateTransactionResult
getOrCreateTransaction() {
131 Transaction currentTxn
= getCurrentTransaction(null);
132 if (currentTxn
!= null) {
133 return new GetOrCreateTransactionResult(false, currentTxn
);
136 switch(datastoreServiceConfig
.getImplicitTransactionManagementPolicy()) {
138 return new GetOrCreateTransactionResult(false, null);
140 return new GetOrCreateTransactionResult(true, createTransaction(
141 TransactionOptions
.Builder
.withDefaults(), false));
143 final String msg
= "Unexpected Transaction Creation Policy: "
144 + datastoreServiceConfig
.getImplicitTransactionManagementPolicy();
146 throw new IllegalArgumentException(msg
);
151 public Transaction
getCurrentTransaction() {
152 return defaultTxnProvider
.peek();
156 public Transaction
getCurrentTransaction(Transaction returnedIfNoTxn
) {
157 return defaultTxnProvider
.peek(returnedIfNoTxn
);
160 DatastoreServiceConfig
getDatastoreServiceConfig() {
161 return datastoreServiceConfig
;
165 public Future
<Entity
> get(Key key
) {
167 throw new NullPointerException("key cannot be null");
169 return wrapSingleGet(key
, get(Arrays
.asList(key
)));
173 public Future
<Entity
> get( Transaction txn
, final Key key
) {
175 throw new NullPointerException("key cannot be null");
177 return wrapSingleGet(key
, get(txn
, Arrays
.asList(key
)));
180 private Future
<Entity
> wrapSingleGet(final Key key
, Future
<Map
<Key
, Entity
>> futureEntities
) {
181 return new FutureWrapper
<Map
<Key
, Entity
>, Entity
>(futureEntities
) {
183 protected Entity
wrap(Map
<Key
, Entity
> entities
) throws Exception
{
184 Entity entity
= entities
.get(key
);
185 if (entity
== null) {
186 throw new EntityNotFoundException(key
);
192 protected Throwable
convertException(Throwable cause
) {
199 public Future
<Map
<Key
, Entity
>> get(final Iterable
<Key
> keys
) {
200 return new TransactionRunner
<Map
<Key
, Entity
>>(getOrCreateTransaction()) {
202 protected Future
<Map
<Key
, Entity
>> runInternal(Transaction txn
) {
203 return get(txn
, keys
);
205 }.runReadInTransaction();
209 public Future
<Map
<Key
, Entity
>> get(Transaction txn
, Iterable
<Key
> keys
) {
211 throw new NullPointerException("keys cannot be null");
214 List
<Key
> keyList
= Lists
.newArrayList(keys
);
216 Map
<Key
, Entity
> resultMap
= new HashMap
<Key
, Entity
>();
217 PreGetContext preGetContext
= new PreGetContext(this, keyList
, resultMap
);
218 datastoreServiceConfig
.getDatastoreCallbacks().executePreGetCallbacks(preGetContext
);
220 keyList
.removeAll(resultMap
.keySet());
222 PreGetCachingResult preGetCachingResult
=
223 entityCachingStrategy
.preGet(this, keyList
, resultMap
);
224 keyList
.removeAll(preGetCachingResult
.getKeysToSkipLoading());
226 Future
<Map
<Key
, Entity
>> result
= doBatchGet(txn
, Sets
.newLinkedHashSet(keyList
), resultMap
);
228 result
= entityCachingStrategy
.createPostGetFuture(result
, preGetCachingResult
);
229 return new PostLoadFuture(result
, datastoreServiceConfig
.getDatastoreCallbacks(), this);
233 public Future
<Key
> put(Entity entity
) {
234 return wrapSinglePut(put(Arrays
.asList(entity
)));
238 public Future
<Key
> put(Transaction txn
, Entity entity
) {
239 return wrapSinglePut(put(txn
, Arrays
.asList(entity
)));
242 private Future
<Key
> wrapSinglePut(Future
<List
<Key
>> futureKeys
) {
243 return new FutureWrapper
<List
<Key
>, Key
>(futureKeys
) {
245 protected Key
wrap(List
<Key
> keys
) throws Exception
{
250 protected Throwable
convertException(Throwable cause
) {
257 public Future
<List
<Key
>> put(final Iterable
<Entity
> entities
) {
258 return new TransactionRunner
<List
<Key
>>(getOrCreateTransaction()) {
260 protected Future
<List
<Key
>> runInternal(Transaction txn
) {
261 return put(txn
, entities
);
263 }.runWriteInTransaction();
267 public Future
<List
<Key
>> put( Transaction txn
, Iterable
<Entity
> entities
) {
268 List
<Entity
> entityList
= entities
instanceof List
269 ?
(List
<Entity
>) entities
: Lists
.newArrayList(entities
);
270 PutContext prePutContext
= new PutContext(this, entityList
);
271 datastoreServiceConfig
.getDatastoreCallbacks().executePrePutCallbacks(prePutContext
);
272 PreMutationCachingResult preMutationCachingResult
=
273 entityCachingStrategy
.prePut(this, entityList
);
275 List
<IndexedItem
<Key
>> indexedKeysToSkip
= Lists
.newArrayList();
276 Set
<Key
> mutationKeysToSkip
= preMutationCachingResult
.getMutationKeysToSkip();
277 List
<Entity
> entitiesToPut
;
278 if (!mutationKeysToSkip
.isEmpty()) {
279 entitiesToPut
= Lists
.newArrayList();
281 for (Entity entity
: entityList
) {
282 if (mutationKeysToSkip
.contains(entity
.getKey())) {
283 indexedKeysToSkip
.add(new IndexedItem
<Key
>(index
++, entity
.getKey()));
285 entitiesToPut
.add(entity
);
290 entitiesToPut
= ImmutableList
.copyOf(entities
);
293 Future
<List
<Key
>> result
= combinePutResult(doBatchPut(txn
, entitiesToPut
), indexedKeysToSkip
);
297 entityCachingStrategy
.createPostMutationFuture(result
, preMutationCachingResult
);
298 PutContext postPutContext
= new PutContext(this, entityList
);
299 result
= new PostPutFuture(result
, datastoreServiceConfig
.getDatastoreCallbacks(),
302 defaultTxnProvider
.addPutEntities(txn
, entityList
);
307 private Future
<List
<Key
>> combinePutResult(Future
<List
<Key
>> rpcResult
,
308 final List
<IndexedItem
<Key
>> skippedKeys
) {
309 if (skippedKeys
.isEmpty()) {
313 return new FutureWrapper
<List
<Key
>, List
<Key
>>(rpcResult
) {
315 protected List
<Key
> wrap(List
<Key
> result
) throws Exception
{
316 List
<Key
> combined
= Lists
.newLinkedList(result
);
317 for (IndexedItem
<Key
> indexedKey
: skippedKeys
) {
318 combined
.add(indexedKey
.index
, indexedKey
.item
);
324 protected Throwable
convertException(Throwable cause
) {
331 public Future
<Void
> delete(Key
... keys
) {
332 return delete(Arrays
.asList(keys
));
336 public Future
<Void
> delete(Transaction txn
, Key
... keys
) {
337 return delete(txn
, Arrays
.asList(keys
));
341 public Future
<Void
> delete(final Iterable
<Key
> keys
) {
342 return new TransactionRunner
<Void
>(getOrCreateTransaction()) {
344 protected Future
<Void
> runInternal(Transaction txn
) {
345 return delete(txn
, keys
);
347 }.runWriteInTransaction();
351 public Future
<Void
> delete(Transaction txn
, Iterable
<Key
> keys
) {
352 List
<Key
> allKeys
= keys
instanceof List
353 ?
(List
<Key
>) keys
: ImmutableList
.copyOf(keys
);
354 DeleteContext preDeleteContext
= new DeleteContext(this, allKeys
);
355 datastoreServiceConfig
.getDatastoreCallbacks().executePreDeleteCallbacks(preDeleteContext
);
356 PreMutationCachingResult preMutationCachingResult
=
357 entityCachingStrategy
.preDelete(this, allKeys
);
358 Future
<Void
> result
= null;
359 Collection
<Key
> keysToDelete
;
360 Set
<Key
> keysToSkip
= preMutationCachingResult
.getMutationKeysToSkip();
361 if (keysToSkip
.isEmpty()) {
362 keysToDelete
= allKeys
;
364 Set
<Key
> keySet
= Sets
.newHashSet(allKeys
);
365 keySet
.removeAll(keysToSkip
);
366 keysToDelete
= keySet
;
368 result
= doBatchDelete(txn
, keysToDelete
);
371 result
= entityCachingStrategy
.createPostMutationFuture(result
, preMutationCachingResult
);
372 result
= new PostDeleteFuture(
373 result
, datastoreServiceConfig
.getDatastoreCallbacks(),
374 new DeleteContext(this, allKeys
));
376 defaultTxnProvider
.addDeletedKeys(txn
, allKeys
);
382 public Collection
<Transaction
> getActiveTransactions() {
383 return defaultTxnProvider
.getAll();
387 * Register the provided future with the provided txn so that we know to
388 * perform a {@link java.util.concurrent.Future#get()} before the txn is
391 * @param txn The txn with which the future must be associated.
392 * @param future The future to associate with the txn.
393 * @param <T> The type of the Future
394 * @return The same future that was passed in, for caller convenience.
396 protected final <T
> Future
<T
> registerInTransaction( Transaction txn
,
399 defaultTxnProvider
.addFuture(txn
, future
);
400 return new FutureHelper
.TxnAwareFuture
<T
>(future
, txn
, defaultTxnProvider
);
406 public Future
<Transaction
> beginTransaction() {
407 return beginTransaction(TransactionOptions
.Builder
.withDefaults());
411 public Future
<Transaction
> beginTransaction(TransactionOptions options
) {
412 Transaction txn
= createTransaction(options
, true);
414 defaultTxnProvider
.push(txn
);
416 return new FutureHelper
.FakeFuture
<Transaction
>(txn
);
419 private Transaction
createTransaction(TransactionOptions options
, boolean isExplicit
) {
420 return new TransactionImpl(
421 datastoreServiceConfig
.getAppIdNamespace().getAppId(), defaultTxnProvider
,
422 datastoreServiceConfig
.getDatastoreCallbacks(), entityCachingStrategy
, isExplicit
,
423 doBeginTransaction(options
));
427 public PreparedQuery
prepare(Query query
) {
428 return prepare(null, query
);
431 @SuppressWarnings("deprecation")
433 public PreparedQuery
prepare(Transaction txn
, Query query
) {
434 PreQueryContext context
= new PreQueryContext(this, query
);
435 datastoreServiceConfig
.getDatastoreCallbacks().executePreQueryCallbacks(context
);
437 query
= context
.getElements().get(0);
438 validateQuery(query
);
439 if (isGeoQuery(query
)) {
440 return new PreparedQueryImpl(query
, txn
, queryRunner
);
442 List
<MultiQueryBuilder
> queriesToRun
= QuerySplitHelper
.splitQuery(query
);
443 query
.setFilter(null);
444 query
.getFilterPredicates().clear();
445 if (queriesToRun
.size() == 1 && queriesToRun
.get(0).isSingleton()) {
446 query
.getFilterPredicates().addAll(queriesToRun
.get(0).getBaseFilters());
447 return new PreparedQueryImpl(query
, txn
, queryRunner
);
449 return new PreparedMultiQuery(query
, queriesToRun
, txn
, queryRunner
);
452 /** Determines whether the query has a geo-spatial filter. */
453 private boolean isGeoQuery(Query query
) {
454 Query
.Filter filter
= query
.getFilter();
455 if (filter
== null) {
458 return isGeoFilter(filter
);
462 * Walks the filter tree searching for a geo-spatial component.
464 * @return true, if we find an StContainsFilter, without thoroughly
465 * validating that the rest of the tree is compatible with the geo-filter.
467 private boolean isGeoFilter(Query
.Filter filter
) {
468 if (filter
instanceof Query
.StContainsFilter
) {
471 if (filter
instanceof Query
.CompositeFilter
) {
472 for (Query
.Filter f
: ((Query
.CompositeFilter
) filter
).getSubFilters()) {
473 if (isGeoFilter(f
)) {
482 public Future
<KeyRange
> allocateIds(String kind
, long num
) {
483 return allocateIds(null, kind
, num
);
486 protected DatastoreType
getDatastoreType() {
487 if (datastoreType
== null) {
488 datastoreType
= quietGet(getDatastoreAttributes()).getDatastoreType();
490 return datastoreType
;
494 public Future
<DatastoreAttributes
> getDatastoreAttributes() {
495 String appId
= datastoreServiceConfig
.getAppIdNamespace().getAppId();
496 DatastoreAttributes attributes
= new DatastoreAttributes(appId
);
497 return new FutureHelper
.FakeFuture
<DatastoreAttributes
>(attributes
);