1 package com
.google
.appengine
.api
.labs
.datastore
.overlay
;
3 import static com
.google
.appengine
.api
.datastore
.FetchOptions
.Builder
.withDefaults
;
4 import static com
.google
.appengine
.api
.datastore
.FetchOptions
.Builder
.withLimit
;
5 import static com
.google
.common
.base
.Preconditions
.checkNotNull
;
6 import static com
.google
.common
.base
.Preconditions
.checkState
;
8 import com
.google
.appengine
.api
.datastore
.Cursor
;
9 import com
.google
.appengine
.api
.datastore
.DatastoreService
;
10 import com
.google
.appengine
.api
.datastore
.Entity
;
11 import com
.google
.appengine
.api
.datastore
.EntityProtoComparators
.EntityProtoComparator
;
12 import com
.google
.appengine
.api
.datastore
.EntityTranslator
;
13 import com
.google
.appengine
.api
.datastore
.FetchOptions
;
14 import com
.google
.appengine
.api
.datastore
.Index
;
15 import com
.google
.appengine
.api
.datastore
.Key
;
16 import com
.google
.appengine
.api
.datastore
.PreparedQuery
;
17 import com
.google
.appengine
.api
.datastore
.Query
;
18 import com
.google
.appengine
.api
.datastore
.QueryResultIterable
;
19 import com
.google
.appengine
.api
.datastore
.QueryResultIterator
;
20 import com
.google
.appengine
.api
.datastore
.QueryResultList
;
21 import com
.google
.appengine
.api
.datastore
.Transaction
;
22 import com
.google
.apphosting
.datastore
.DatastoreV3Pb
;
23 import com
.google
.common
.collect
.ImmutableList
;
24 import com
.google
.common
.collect
.Iterables
;
25 import com
.google
.common
.collect
.Queues
;
27 import java
.util
.Comparator
;
28 import java
.util
.Iterator
;
29 import java
.util
.List
;
31 import java
.util
.NoSuchElementException
;
32 import java
.util
.Queue
;
35 * An implementation of {@link PreparedQuery} for overlay queries.
37 final class OverlayPreparedQueryImpl
implements PreparedQuery
{
39 private static final int COUNT_ENTITIES_LEGACY_LIMIT
= 1000;
41 private final OverlayAsyncDatastoreServiceImpl overlay
;
42 private final PreparedQuery preparedOverlayQuery
;
43 private final PreparedQuery preparedParentQuery
;
44 private final EntityComparator entityComparator
;
45 private final Transaction txn
;
48 * Constructs an overlay-based {@link PreparedQuery}.
50 * @param overlay the {@link OverlayBaseDatastoreServiceImpl}
51 * @param preparedOverlayQuery the {@link PreparedQuery} on the overlay's backing Datastore
52 * @param preparedParentQuery the {@link PreparedQuery} on the parent Datastore
53 * @param txn the current transaction, if any
55 * @throws IllegalArgumentException if this multi-query required in memory sorting and the base
56 * query is both a keys-only query and sorted by anything other than its key.
58 public OverlayPreparedQueryImpl(OverlayAsyncDatastoreServiceImpl overlay
, Query query
,
59 PreparedQuery preparedOverlayQuery
, PreparedQuery preparedParentQuery
, Transaction txn
) {
61 this.overlay
= checkNotNull(overlay
);
62 this.preparedOverlayQuery
= checkNotNull(preparedOverlayQuery
);
63 this.preparedParentQuery
= checkNotNull(preparedParentQuery
);
64 this.entityComparator
= new EntityComparator(query
.getSortPredicates());
69 public List
<Entity
> asList(FetchOptions fetchOptions
) {
70 checkNotNull(fetchOptions
);
71 return asQueryResultList(fetchOptions
);
75 public QueryResultList
<Entity
> asQueryResultList(FetchOptions fetchOptions
) {
76 checkNotNull(fetchOptions
);
77 return new LazyList(runQuery(fetchOptions
));
81 public Iterable
<Entity
> asIterable(final FetchOptions fetchOptions
) {
82 checkNotNull(fetchOptions
);
83 return new Iterable
<Entity
>() {
85 public Iterator
<Entity
> iterator() {
86 return asIterator(fetchOptions
);
92 public QueryResultIterable
<Entity
> asQueryResultIterable(final FetchOptions fetchOptions
) {
93 checkNotNull(fetchOptions
);
94 return new QueryResultIterable
<Entity
>() {
96 public QueryResultIterator
<Entity
> iterator() {
97 return asQueryResultIterator(fetchOptions
);
103 public Iterable
<Entity
> asIterable() {
104 return asIterable(withDefaults());
108 public QueryResultIterable
<Entity
> asQueryResultIterable() {
109 return asQueryResultIterable(withDefaults());
113 public Iterator
<Entity
> asIterator(FetchOptions fetchOptions
) {
114 checkNotNull(fetchOptions
);
115 return asQueryResultIterator(fetchOptions
);
119 public Iterator
<Entity
> asIterator() {
120 return asIterator(withDefaults());
124 public QueryResultIterator
<Entity
> asQueryResultIterator(FetchOptions fetchOptions
) {
125 checkNotNull(fetchOptions
);
126 return runQuery(fetchOptions
);
130 public QueryResultIterator
<Entity
> asQueryResultIterator() {
131 return asQueryResultIterator(withDefaults());
135 public Entity
asSingleEntity() throws TooManyResultsException
{
136 List
<Entity
> entities
= asList(withLimit(2));
137 if (entities
.isEmpty()) {
139 } else if (entities
.size() != 1) {
140 throw new TooManyResultsException();
142 return entities
.get(0);
146 public int countEntities(FetchOptions fetchOptions
) {
147 checkNotNull(fetchOptions
);
148 return Iterables
.size(asIterable(fetchOptions
));
152 public int countEntities() {
153 return countEntities(withDefaults().limit(COUNT_ENTITIES_LEGACY_LIMIT
));
156 static DatastoreV3Pb
.Query
.Order
convertSortPredicateToPb(Query
.SortPredicate predicate
) {
157 checkNotNull(predicate
);
158 DatastoreV3Pb
.Query
.Order order
= new DatastoreV3Pb
.Query
.Order();
159 order
.setProperty(predicate
.getPropertyName());
160 order
.setDirection(getSortOp(predicate
.getDirection()));
164 private static DatastoreV3Pb
.Query
.Order
.Direction
getSortOp(Query
.SortDirection direction
) {
167 return DatastoreV3Pb
.Query
.Order
.Direction
.ASCENDING
;
169 return DatastoreV3Pb
.Query
.Order
.Direction
.DESCENDING
;
171 throw new IllegalArgumentException("direction: " + direction
);
175 private OverlayQueryResultIteratorImpl
runQuery(FetchOptions fetchOptions
) {
176 checkNotNull(fetchOptions
);
177 FetchOptions overlayFetchOptions
= cloneFetchOptionsPrefetchAndChunkSize(fetchOptions
);
178 FetchOptions parentFetchOptions
= cloneFetchOptionsPrefetchAndChunkSize(fetchOptions
);
179 QueryResultIterator
<Entity
> overlayIterator
=
180 preparedOverlayQuery
.asQueryResultIterator(overlayFetchOptions
);
181 QueryResultIterator
<Entity
> parentIterator
=
182 preparedParentQuery
.asQueryResultIterator(parentFetchOptions
);
183 return new OverlayQueryResultIteratorImpl(overlayIterator
, parentIterator
, fetchOptions
);
186 private FetchOptions
cloneFetchOptionsPrefetchAndChunkSize(FetchOptions fetchOptions
) {
187 checkNotNull(fetchOptions
);
188 FetchOptions clonedOptions
= withDefaults();
189 Integer prefetchSize
= fetchOptions
.getPrefetchSize();
190 if (prefetchSize
!= null) {
191 clonedOptions
.prefetchSize(prefetchSize
);
193 Integer chunkSize
= fetchOptions
.getChunkSize();
194 if (chunkSize
!= null) {
195 clonedOptions
.chunkSize(chunkSize
);
197 return clonedOptions
;
200 private FetchOptions
cloneFetchOptions(FetchOptions fetchOptions
) {
201 checkNotNull(fetchOptions
);
202 FetchOptions clonedOptions
= cloneFetchOptionsPrefetchAndChunkSize(fetchOptions
);
203 Cursor startCursor
= fetchOptions
.getStartCursor();
204 if (startCursor
!= null) {
205 clonedOptions
.startCursor(startCursor
);
207 Cursor endCursor
= fetchOptions
.getEndCursor();
208 if (endCursor
!= null) {
209 clonedOptions
.endCursor(endCursor
);
211 Integer limit
= fetchOptions
.getLimit();
213 clonedOptions
.limit(limit
);
215 Integer offset
= fetchOptions
.getOffset();
216 if (offset
!= null) {
217 clonedOptions
.offset(offset
);
219 return clonedOptions
;
223 * A comparator for {@link Entity} instances that delegates to an {@link EntityProtoComparator}.
225 private static final class EntityComparator
implements Comparator
<Entity
> {
226 private final EntityProtoComparator delegate
;
228 EntityComparator(List
<Query
.SortPredicate
> sortPreds
) {
229 checkNotNull(sortPreds
);
230 delegate
= new EntityProtoComparator(
231 sortPredicatesToOrders(sortPreds
));
234 private static List
<DatastoreV3Pb
.Query
.Order
> sortPredicatesToOrders(
235 List
<Query
.SortPredicate
> sortPreds
) {
236 checkNotNull(sortPreds
);
237 ImmutableList
.Builder
<DatastoreV3Pb
.Query
.Order
> orders
=
238 ImmutableList
.<DatastoreV3Pb
.Query
.Order
>builder();
239 for (Query
.SortPredicate sp
: sortPreds
) {
240 orders
.add(convertSortPredicateToPb(sp
));
242 return orders
.build();
246 public int compare( Entity e1
, Entity e2
) {
247 return delegate
.compare(EntityTranslator
.convertToPb(e1
), EntityTranslator
.convertToPb(e2
));
252 * An implementation of {@link QueryResultIterator} for use with an overlay-based
253 * {@link DatastoreService}.
255 * <p>We run the input query on the parent, and then run it again on the overlay. We then merge
256 * the two queries. As with {@code PreparedMultiQuery}, we take advantage of the fact that the
257 * results from each query are already sorted (if a sort order was specified), and we interleave
258 * the results from the two queries. (If no sort order was specified, then the order of results is
259 * implementation-defined, anyway.)
261 * <p>We maintain a combined queue of entities from both sources, from which we return entities to
262 * the user. We also maintain separate queues for the overlay and parent queries. When the user
263 * asks for more entities, we first check to see if the combined queue can satisfy their request.
264 * If so, then we're done. If not, then we need to pull more elements from the component queues.
265 * The merge algorithm is iterative, and works something like this:
268 * <li>If either of the component queues is empty, refill it (in {@code chunkSize} chunks).
269 * Make sure that any "invalid" entities are removed. For the overlay query, that means that
270 * tombstones should be filtered out. For the parent entity, that means that if the overlay also
271 * has an entity with the same key as a given parent entity, then the parent entity should be
272 * filtered out. This work is done in batches to the greatest extent possible.
274 * <li>Once both component queues have some entities, merge them, by continually taking the
275 * entity that sorts first (from whichever queue) and appending it to the end of the combined
278 * <li>In the event that one of the queries is completely depleted, just keep adding entities
279 * from the remaining query.
282 private final class OverlayQueryResultIteratorImpl
implements QueryResultBatchIterator
<Entity
> {
283 private final QueryResultIterator
<Entity
> overlayIterator
;
284 private final QueryResultIterator
<Entity
> parentIterator
;
285 private final FetchOptions fetchOptions
;
286 private Queue
<Entity
> combinedEntityQueue
;
287 private Queue
<Entity
> overlayEntityQueue
;
288 private Queue
<Entity
> parentEntityQueue
;
289 private Integer remainingLimit
;
290 private int remainingOffset
;
293 * Constructs an overlay-based {@link QueryResultIterator<Entity>}.
295 * @param overlayIterator the iterator for the query on the overlay's backing Datastore
296 * @param parentIterator the iterator for the query on the parent Datastore
297 * @param fetchOptions the fetch options to apply
299 private OverlayQueryResultIteratorImpl(QueryResultIterator
<Entity
> overlayIterator
,
300 QueryResultIterator
<Entity
> parentIterator
, FetchOptions fetchOptions
) {
301 this.overlayIterator
= checkNotNull(overlayIterator
);
302 this.parentIterator
= checkNotNull(parentIterator
);
303 this.fetchOptions
= checkNotNull(fetchOptions
);
304 combinedEntityQueue
= Queues
.newArrayDeque();
305 overlayEntityQueue
= Queues
.newArrayDeque();
306 parentEntityQueue
= Queues
.newArrayDeque();
307 this.remainingLimit
= fetchOptions
.getLimit();
308 Integer offset
= fetchOptions
.getOffset();
309 if (offset
!= null) {
310 this.remainingOffset
= offset
;
312 if (fetchOptions
.getPrefetchSize() != null) {
313 ensureLoaded(fetchOptions
.getPrefetchSize());
318 public boolean hasNext() {
319 return ensureLoaded(1) >= 1;
323 public Entity
next() {
325 throw new NoSuchElementException();
328 Entity next
= takeNext();
333 public List
<Entity
> nextList(int maximumElements
) {
334 ImmutableList
.Builder
<Entity
> builder
= ImmutableList
.<Entity
>builder();
335 for (int i
= 0; i
< maximumElements
; i
++) {
341 return builder
.build();
345 public List
<Index
> getIndexList() {
346 return parentIterator
.getIndexList();
350 public Cursor
getCursor() {
355 public void remove() {
356 throw new UnsupportedOperationException("QueryResultIterator does not support remove");
360 * Takes and returns the next entity from the iterator, assuming that there is such an entity.
362 private Entity
takeNext() {
363 return combinedEntityQueue
.poll();
367 * Returns true if the overlay query has not yet been exhausted.
369 private boolean overlayQueryHasMore() {
370 return (!overlayEntityQueue
.isEmpty() || overlayIterator
.hasNext());
374 * Returns true if the parent query has not yet been exhausted.
376 private boolean parentQueryHasMore() {
377 return (!parentEntityQueue
.isEmpty() || parentIterator
.hasNext());
381 * Returns the chunk size, with a default of 1 if no chunk size was specified.
383 private int getChunkSize() {
384 Integer chunkSize
= fetchOptions
.getChunkSize();
385 return chunkSize
!= null ? chunkSize
: 1;
389 * Refills the overlay entity queue from the overlay query iterator.
391 private void refillOverlayEntityQueue() {
392 checkState(overlayEntityQueue
.isEmpty());
393 for (int i
= 0; i
< getChunkSize(); i
++) {
394 if (!overlayIterator
.hasNext()) {
397 Entity e
= overlayIterator
.next();
398 if (!OverlayUtils
.isTombstone(e
)) {
399 overlayEntityQueue
.add(e
);
405 * Refills the parent entity queue from the parent query iterator.
407 private void refillParentEntityQueue() {
408 checkState(parentEntityQueue
.isEmpty());
409 for (int i
= 0; i
< getChunkSize(); i
++) {
410 if (!parentIterator
.hasNext()) {
413 parentEntityQueue
.add(parentIterator
.next());
416 Map
<Key
, Entity
> overlayResults
= overlay
.getFromOverlayOnly(
417 txn
, OverlayUtils
.getKeysAndTombstoneKeysForEntities(parentEntityQueue
));
418 Iterator
<Entity
> iterator
= parentEntityQueue
.iterator();
419 while (iterator
.hasNext()) {
420 Entity entity
= iterator
.next();
421 if (overlayResults
.containsKey(entity
.getKey())
422 || overlayResults
.containsKey(OverlayUtils
.getTombstoneKey(entity
.getKey()))) {
429 * Merges elements from the overlay and parent queues into a single entity queue, respecting any
432 * @param numEntities the maximum number of entities that should be in the entity queue when
433 * this method completes.
435 private void mergeEntityQueues(int numEntities
) {
436 if (overlayEntityQueue
.isEmpty()) {
437 checkState(!overlayIterator
.hasNext());
438 if (parentEntityQueue
.isEmpty()) {
439 checkState(!parentIterator
.hasNext());
441 while (!parentEntityQueue
.isEmpty()) {
442 enqueueEntity(parentEntityQueue
.poll());
446 } else if (parentEntityQueue
.isEmpty()) {
447 checkState(!parentIterator
.hasNext());
448 while (!overlayEntityQueue
.isEmpty()) {
449 enqueueEntity(overlayEntityQueue
.poll());
454 for (int i
= combinedEntityQueue
.size(); i
< numEntities
; i
++) {
455 if (overlayEntityQueue
.isEmpty() || parentEntityQueue
.isEmpty()) {
459 int result
= entityComparator
.compare(overlayEntityQueue
.peek(), parentEntityQueue
.peek());
461 enqueueEntity(overlayEntityQueue
.poll());
463 enqueueEntity(parentEntityQueue
.poll());
469 * Adds the given entity to the end of the entity queue, and adjusts the limit and offset
470 * counters accordingly.
472 private void enqueueEntity(Entity e
) {
473 if (remainingOffset
> 0) {
478 combinedEntityQueue
.add(e
);
480 if (remainingLimit
!= null) {
482 if (remainingLimit
== 0) {
483 overlayEntityQueue
.clear();
484 parentEntityQueue
.clear();
490 * Requests additional {@code Entity} instances so that there are at least {@code numEntities}
491 * entities available (or both iterators are exhausted). If there is a limit, stops requesting
492 * entities once the limit has been reached. Does not take the offset into account.
494 * @param numEntities the number of entities that should be in the entity queue when this
496 * @return the number of entities that are now available
498 private int ensureLoaded(int numEntities
) {
499 if (remainingLimit
== null || remainingLimit
> 0) {
500 while (combinedEntityQueue
.size() < numEntities
) {
501 if (!overlayQueryHasMore() && !parentQueryHasMore()) {
502 return combinedEntityQueue
.size();
504 while (overlayEntityQueue
.isEmpty() && overlayIterator
.hasNext()) {
505 refillOverlayEntityQueue();
507 while (parentEntityQueue
.isEmpty() && parentIterator
.hasNext()) {
508 refillParentEntityQueue();
510 mergeEntityQueues(numEntities
);
513 return combinedEntityQueue
.size();