Revision created by MOE tool push_codebase.
[gae.git] / java / src / main / com / google / appengine / api / labs / datastore / overlay / OverlayPreparedQueryImpl.java
blob61eff6af0222726196d5e135f5b0c91ca30eba48
1 package com.google.appengine.api.labs.datastore.overlay;
3 import static com.google.appengine.api.datastore.FetchOptions.Builder.withDefaults;
4 import static com.google.appengine.api.datastore.FetchOptions.Builder.withLimit;
5 import static com.google.common.base.Preconditions.checkNotNull;
6 import static com.google.common.base.Preconditions.checkState;
8 import com.google.appengine.api.datastore.Cursor;
9 import com.google.appengine.api.datastore.DatastoreService;
10 import com.google.appengine.api.datastore.Entity;
11 import com.google.appengine.api.datastore.EntityProtoComparators.EntityProtoComparator;
12 import com.google.appengine.api.datastore.EntityTranslator;
13 import com.google.appengine.api.datastore.FetchOptions;
14 import com.google.appengine.api.datastore.Index;
15 import com.google.appengine.api.datastore.Key;
16 import com.google.appengine.api.datastore.PreparedQuery;
17 import com.google.appengine.api.datastore.Query;
18 import com.google.appengine.api.datastore.QueryResultIterable;
19 import com.google.appengine.api.datastore.QueryResultIterator;
20 import com.google.appengine.api.datastore.QueryResultList;
21 import com.google.appengine.api.datastore.Transaction;
22 import com.google.apphosting.datastore.DatastoreV3Pb;
23 import com.google.common.collect.ImmutableList;
24 import com.google.common.collect.Iterables;
25 import com.google.common.collect.Queues;
27 import java.util.Comparator;
28 import java.util.Iterator;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.NoSuchElementException;
32 import java.util.Queue;
34 /**
35 * An implementation of {@link PreparedQuery} for overlay queries.
37 final class OverlayPreparedQueryImpl implements PreparedQuery {
39 private static final int COUNT_ENTITIES_LEGACY_LIMIT = 1000;
41 private final OverlayAsyncDatastoreServiceImpl overlay;
42 private final PreparedQuery preparedOverlayQuery;
43 private final PreparedQuery preparedParentQuery;
44 private final EntityComparator entityComparator;
45 private final Transaction txn;
47 /**
48 * Constructs an overlay-based {@link PreparedQuery}.
50 * @param overlay the {@link OverlayBaseDatastoreServiceImpl}
51 * @param preparedOverlayQuery the {@link PreparedQuery} on the overlay's backing Datastore
52 * @param preparedParentQuery the {@link PreparedQuery} on the parent Datastore
53 * @param txn the current transaction, if any
55 * @throws IllegalArgumentException if this multi-query required in memory sorting and the base
56 * query is both a keys-only query and sorted by anything other than its key.
58 public OverlayPreparedQueryImpl(OverlayAsyncDatastoreServiceImpl overlay, Query query,
59 PreparedQuery preparedOverlayQuery, PreparedQuery preparedParentQuery, Transaction txn) {
60 checkNotNull(query);
61 this.overlay = checkNotNull(overlay);
62 this.preparedOverlayQuery = checkNotNull(preparedOverlayQuery);
63 this.preparedParentQuery = checkNotNull(preparedParentQuery);
64 this.entityComparator = new EntityComparator(query.getSortPredicates());
65 this.txn = txn;
68 @Override
69 public List<Entity> asList(FetchOptions fetchOptions) {
70 checkNotNull(fetchOptions);
71 return asQueryResultList(fetchOptions);
74 @Override
75 public QueryResultList<Entity> asQueryResultList(FetchOptions fetchOptions) {
76 checkNotNull(fetchOptions);
77 return new LazyList(runQuery(fetchOptions));
80 @Override
81 public Iterable<Entity> asIterable(final FetchOptions fetchOptions) {
82 checkNotNull(fetchOptions);
83 return new Iterable<Entity>() {
84 @Override
85 public Iterator<Entity> iterator() {
86 return asIterator(fetchOptions);
91 @Override
92 public QueryResultIterable<Entity> asQueryResultIterable(final FetchOptions fetchOptions) {
93 checkNotNull(fetchOptions);
94 return new QueryResultIterable<Entity>() {
95 @Override
96 public QueryResultIterator<Entity> iterator() {
97 return asQueryResultIterator(fetchOptions);
102 @Override
103 public Iterable<Entity> asIterable() {
104 return asIterable(withDefaults());
107 @Override
108 public QueryResultIterable<Entity> asQueryResultIterable() {
109 return asQueryResultIterable(withDefaults());
112 @Override
113 public Iterator<Entity> asIterator(FetchOptions fetchOptions) {
114 checkNotNull(fetchOptions);
115 return asQueryResultIterator(fetchOptions);
118 @Override
119 public Iterator<Entity> asIterator() {
120 return asIterator(withDefaults());
123 @Override
124 public QueryResultIterator<Entity> asQueryResultIterator(FetchOptions fetchOptions) {
125 checkNotNull(fetchOptions);
126 return runQuery(fetchOptions);
129 @Override
130 public QueryResultIterator<Entity> asQueryResultIterator() {
131 return asQueryResultIterator(withDefaults());
134 @Override
135 public Entity asSingleEntity() throws TooManyResultsException {
136 List<Entity> entities = asList(withLimit(2));
137 if (entities.isEmpty()) {
138 return null;
139 } else if (entities.size() != 1) {
140 throw new TooManyResultsException();
142 return entities.get(0);
145 @Override
146 public int countEntities(FetchOptions fetchOptions) {
147 checkNotNull(fetchOptions);
148 return Iterables.size(asIterable(fetchOptions));
151 @Override
152 public int countEntities() {
153 return countEntities(withDefaults().limit(COUNT_ENTITIES_LEGACY_LIMIT));
156 static DatastoreV3Pb.Query.Order convertSortPredicateToPb(Query.SortPredicate predicate) {
157 checkNotNull(predicate);
158 DatastoreV3Pb.Query.Order order = new DatastoreV3Pb.Query.Order();
159 order.setProperty(predicate.getPropertyName());
160 order.setDirection(getSortOp(predicate.getDirection()));
161 return order;
164 private static DatastoreV3Pb.Query.Order.Direction getSortOp(Query.SortDirection direction) {
165 switch (direction) {
166 case ASCENDING:
167 return DatastoreV3Pb.Query.Order.Direction.ASCENDING;
168 case DESCENDING:
169 return DatastoreV3Pb.Query.Order.Direction.DESCENDING;
170 default:
171 throw new IllegalArgumentException("direction: " + direction);
175 private OverlayQueryResultIteratorImpl runQuery(FetchOptions fetchOptions) {
176 checkNotNull(fetchOptions);
177 FetchOptions overlayFetchOptions = cloneFetchOptionsPrefetchAndChunkSize(fetchOptions);
178 FetchOptions parentFetchOptions = cloneFetchOptionsPrefetchAndChunkSize(fetchOptions);
179 QueryResultIterator<Entity> overlayIterator =
180 preparedOverlayQuery.asQueryResultIterator(overlayFetchOptions);
181 QueryResultIterator<Entity> parentIterator =
182 preparedParentQuery.asQueryResultIterator(parentFetchOptions);
183 return new OverlayQueryResultIteratorImpl(overlayIterator, parentIterator, fetchOptions);
186 private FetchOptions cloneFetchOptionsPrefetchAndChunkSize(FetchOptions fetchOptions) {
187 checkNotNull(fetchOptions);
188 FetchOptions clonedOptions = withDefaults();
189 Integer prefetchSize = fetchOptions.getPrefetchSize();
190 if (prefetchSize != null) {
191 clonedOptions.prefetchSize(prefetchSize);
193 Integer chunkSize = fetchOptions.getChunkSize();
194 if (chunkSize != null) {
195 clonedOptions.chunkSize(chunkSize);
197 return clonedOptions;
200 private FetchOptions cloneFetchOptions(FetchOptions fetchOptions) {
201 checkNotNull(fetchOptions);
202 FetchOptions clonedOptions = cloneFetchOptionsPrefetchAndChunkSize(fetchOptions);
203 Cursor startCursor = fetchOptions.getStartCursor();
204 if (startCursor != null) {
205 clonedOptions.startCursor(startCursor);
207 Cursor endCursor = fetchOptions.getEndCursor();
208 if (endCursor != null) {
209 clonedOptions.endCursor(endCursor);
211 Integer limit = fetchOptions.getLimit();
212 if (limit != null) {
213 clonedOptions.limit(limit);
215 Integer offset = fetchOptions.getOffset();
216 if (offset != null) {
217 clonedOptions.offset(offset);
219 return clonedOptions;
223 * A comparator for {@link Entity} instances that delegates to an {@link EntityProtoComparator}.
225 private static final class EntityComparator implements Comparator<Entity> {
226 private final EntityProtoComparator delegate;
228 EntityComparator(List<Query.SortPredicate> sortPreds) {
229 checkNotNull(sortPreds);
230 delegate = new EntityProtoComparator(
231 sortPredicatesToOrders(sortPreds));
234 private static List<DatastoreV3Pb.Query.Order> sortPredicatesToOrders(
235 List<Query.SortPredicate> sortPreds) {
236 checkNotNull(sortPreds);
237 ImmutableList.Builder<DatastoreV3Pb.Query.Order> orders =
238 ImmutableList.<DatastoreV3Pb.Query.Order>builder();
239 for (Query.SortPredicate sp : sortPreds) {
240 orders.add(convertSortPredicateToPb(sp));
242 return orders.build();
245 @Override
246 public int compare( Entity e1, Entity e2) {
247 return delegate.compare(EntityTranslator.convertToPb(e1), EntityTranslator.convertToPb(e2));
252 * An implementation of {@link QueryResultIterator} for use with an overlay-based
253 * {@link DatastoreService}.
255 * <p>We run the input query on the parent, and then run it again on the overlay. We then merge
256 * the two queries. As with {@code PreparedMultiQuery}, we take advantage of the fact that the
257 * results from each query are already sorted (if a sort order was specified), and we interleave
258 * the results from the two queries. (If no sort order was specified, then the order of results is
259 * implementation-defined, anyway.)
261 * <p>We maintain a combined queue of entities from both sources, from which we return entities to
262 * the user. We also maintain separate queues for the overlay and parent queries. When the user
263 * asks for more entities, we first check to see if the combined queue can satisfy their request.
264 * If so, then we're done. If not, then we need to pull more elements from the component queues.
265 * The merge algorithm is iterative, and works something like this:
267 * <ul>
268 * <li>If either of the component queues is empty, refill it (in {@code chunkSize} chunks).
269 * Make sure that any "invalid" entities are removed. For the overlay query, that means that
270 * tombstones should be filtered out. For the parent entity, that means that if the overlay also
271 * has an entity with the same key as a given parent entity, then the parent entity should be
272 * filtered out. This work is done in batches to the greatest extent possible.
274 * <li>Once both component queues have some entities, merge them, by continually taking the
275 * entity that sorts first (from whichever queue) and appending it to the end of the combined
276 * queue.
278 * <li>In the event that one of the queries is completely depleted, just keep adding entities
279 * from the remaining query.
280 * </ul>
282 private final class OverlayQueryResultIteratorImpl implements QueryResultBatchIterator<Entity> {
283 private final QueryResultIterator<Entity> overlayIterator;
284 private final QueryResultIterator<Entity> parentIterator;
285 private final FetchOptions fetchOptions;
286 private Queue<Entity> combinedEntityQueue;
287 private Queue<Entity> overlayEntityQueue;
288 private Queue<Entity> parentEntityQueue;
289 private Integer remainingLimit;
290 private int remainingOffset;
293 * Constructs an overlay-based {@link QueryResultIterator<Entity>}.
295 * @param overlayIterator the iterator for the query on the overlay's backing Datastore
296 * @param parentIterator the iterator for the query on the parent Datastore
297 * @param fetchOptions the fetch options to apply
299 private OverlayQueryResultIteratorImpl(QueryResultIterator<Entity> overlayIterator,
300 QueryResultIterator<Entity> parentIterator, FetchOptions fetchOptions) {
301 this.overlayIterator = checkNotNull(overlayIterator);
302 this.parentIterator = checkNotNull(parentIterator);
303 this.fetchOptions = checkNotNull(fetchOptions);
304 combinedEntityQueue = Queues.newArrayDeque();
305 overlayEntityQueue = Queues.newArrayDeque();
306 parentEntityQueue = Queues.newArrayDeque();
307 this.remainingLimit = fetchOptions.getLimit();
308 Integer offset = fetchOptions.getOffset();
309 if (offset != null) {
310 this.remainingOffset = offset;
312 if (fetchOptions.getPrefetchSize() != null) {
313 ensureLoaded(fetchOptions.getPrefetchSize());
317 @Override
318 public boolean hasNext() {
319 return ensureLoaded(1) >= 1;
322 @Override
323 public Entity next() {
324 if (!hasNext()) {
325 throw new NoSuchElementException();
328 Entity next = takeNext();
329 return next;
332 @Override
333 public List<Entity> nextList(int maximumElements) {
334 ImmutableList.Builder<Entity> builder = ImmutableList.<Entity>builder();
335 for (int i = 0; i < maximumElements; i++) {
336 if (!hasNext()) {
337 break;
339 builder.add(next());
341 return builder.build();
344 @Override
345 public List<Index> getIndexList() {
346 return parentIterator.getIndexList();
349 @Override
350 public Cursor getCursor() {
351 return null;
354 @Override
355 public void remove() {
356 throw new UnsupportedOperationException("QueryResultIterator does not support remove");
360 * Takes and returns the next entity from the iterator, assuming that there is such an entity.
362 private Entity takeNext() {
363 return combinedEntityQueue.poll();
367 * Returns true if the overlay query has not yet been exhausted.
369 private boolean overlayQueryHasMore() {
370 return (!overlayEntityQueue.isEmpty() || overlayIterator.hasNext());
374 * Returns true if the parent query has not yet been exhausted.
376 private boolean parentQueryHasMore() {
377 return (!parentEntityQueue.isEmpty() || parentIterator.hasNext());
381 * Returns the chunk size, with a default of 1 if no chunk size was specified.
383 private int getChunkSize() {
384 Integer chunkSize = fetchOptions.getChunkSize();
385 return chunkSize != null ? chunkSize : 1;
389 * Refills the overlay entity queue from the overlay query iterator.
391 private void refillOverlayEntityQueue() {
392 checkState(overlayEntityQueue.isEmpty());
393 for (int i = 0; i < getChunkSize(); i++) {
394 if (!overlayIterator.hasNext()) {
395 break;
397 Entity e = overlayIterator.next();
398 if (!OverlayUtils.isTombstone(e)) {
399 overlayEntityQueue.add(e);
405 * Refills the parent entity queue from the parent query iterator.
407 private void refillParentEntityQueue() {
408 checkState(parentEntityQueue.isEmpty());
409 for (int i = 0; i < getChunkSize(); i++) {
410 if (!parentIterator.hasNext()) {
411 break;
413 parentEntityQueue.add(parentIterator.next());
416 Map<Key, Entity> overlayResults = overlay.getFromOverlayOnly(
417 txn, OverlayUtils.getKeysAndTombstoneKeysForEntities(parentEntityQueue));
418 Iterator<Entity> iterator = parentEntityQueue.iterator();
419 while (iterator.hasNext()) {
420 Entity entity = iterator.next();
421 if (overlayResults.containsKey(entity.getKey())
422 || overlayResults.containsKey(OverlayUtils.getTombstoneKey(entity.getKey()))) {
423 iterator.remove();
429 * Merges elements from the overlay and parent queues into a single entity queue, respecting any
430 * sort orders.
432 * @param numEntities the maximum number of entities that should be in the entity queue when
433 * this method completes.
435 private void mergeEntityQueues(int numEntities) {
436 if (overlayEntityQueue.isEmpty()) {
437 checkState(!overlayIterator.hasNext());
438 if (parentEntityQueue.isEmpty()) {
439 checkState(!parentIterator.hasNext());
440 } else {
441 while (!parentEntityQueue.isEmpty()) {
442 enqueueEntity(parentEntityQueue.poll());
445 return;
446 } else if (parentEntityQueue.isEmpty()) {
447 checkState(!parentIterator.hasNext());
448 while (!overlayEntityQueue.isEmpty()) {
449 enqueueEntity(overlayEntityQueue.poll());
451 return;
454 for (int i = combinedEntityQueue.size(); i < numEntities; i++) {
455 if (overlayEntityQueue.isEmpty() || parentEntityQueue.isEmpty()) {
456 return;
459 int result = entityComparator.compare(overlayEntityQueue.peek(), parentEntityQueue.peek());
460 if (result < 0) {
461 enqueueEntity(overlayEntityQueue.poll());
462 } else {
463 enqueueEntity(parentEntityQueue.poll());
469 * Adds the given entity to the end of the entity queue, and adjusts the limit and offset
470 * counters accordingly.
472 private void enqueueEntity(Entity e) {
473 if (remainingOffset > 0) {
474 remainingOffset--;
475 return;
478 combinedEntityQueue.add(e);
480 if (remainingLimit != null) {
481 remainingLimit--;
482 if (remainingLimit == 0) {
483 overlayEntityQueue.clear();
484 parentEntityQueue.clear();
490 * Requests additional {@code Entity} instances so that there are at least {@code numEntities}
491 * entities available (or both iterators are exhausted). If there is a limit, stops requesting
492 * entities once the limit has been reached. Does not take the offset into account.
494 * @param numEntities the number of entities that should be in the entity queue when this
495 * method completes
496 * @return the number of entities that are now available
498 private int ensureLoaded(int numEntities) {
499 if (remainingLimit == null || remainingLimit > 0) {
500 while (combinedEntityQueue.size() < numEntities) {
501 if (!overlayQueryHasMore() && !parentQueryHasMore()) {
502 return combinedEntityQueue.size();
504 while (overlayEntityQueue.isEmpty() && overlayIterator.hasNext()) {
505 refillOverlayEntityQueue();
507 while (parentEntityQueue.isEmpty() && parentIterator.hasNext()) {
508 refillParentEntityQueue();
510 mergeEntityQueues(numEntities);
513 return combinedEntityQueue.size();