App Engine Java SDK version 1.9.25
[gae.git] / java / src / main / com / google / appengine / api / datastore / PreparedMultiQuery.java
blob8d895c265d47bc3ef1118be3255cd88546657c94
1 // Copyright 2009 Google Inc. All Rights Reserved.
3 package com.google.appengine.api.datastore;
5 import static com.google.common.base.Preconditions.checkArgument;
7 import com.google.appengine.api.datastore.Query.FilterPredicate;
8 import com.google.appengine.api.datastore.Query.SortPredicate;
9 import com.google.apphosting.datastore.DatastoreV3Pb.Query.Order;
10 import com.google.auto.value.AutoValue;
11 import com.google.common.base.Function;
12 import com.google.common.collect.Iterables;
13 import com.google.common.collect.Lists;
14 import com.google.common.collect.Sets;
16 import java.util.ArrayDeque;
17 import java.util.ArrayList;
18 import java.util.Collections;
19 import java.util.Comparator;
20 import java.util.Iterator;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.PriorityQueue;
24 import java.util.Queue;
25 import java.util.Set;
27 import javax.annotation.Nullable;
29 /**
30 * A {@link PreparedQuery} implementation for use with {@link MultiQueryBuilder}.
32 * We run each successively generated list of filters returned by each
33 * {@link MultiQueryBuilder} as they are needed and concatenate the result.
35 * If a list of filters contains more than one entry or there are multiple
36 * {@link MultiQueryBuilder}s we build a {@link Comparator} based on the sort
37 * predicates of the base query. We then use this {@link Comparator} to produce
38 * an appropriately ordered sequence of results that contains the results from
39 * each sub-query. As each sub-query produces results that are already sorted
40 * we simply use a {@link PriorityQueue} to merge the results from the sub-query
41 * as new results are requested.
44 class PreparedMultiQuery extends BasePreparedQuery {
45 static final int MAX_BUFFERED_QUERIES = 10;
47 private final Query baseQuery;
48 private final List<MultiQueryBuilder> queryBuilders;
49 private final EntityComparator entityComparator;
50 private final Transaction txn;
51 private final QueryRunner queryRunner;
52 private final Set<String> projected;
54 private final int[] maxBufferedIteratorsPerBuilder;
56 /**
57 * @param apiConfig the api config to use
58 * @param datastoreServiceConfig the datastore service config to use
59 * @param baseQuery the base query on which to apply generate filters filters
60 * @param queryBuilders the source of filters to use
61 * @param txn the txn in which all queries should execute, can be {@code null}
63 * @throws IllegalArgumentException if this multi-query required in memory
64 * sorting and the base query is both a keys-only query and sorted by anything
65 * other than its key.
67 PreparedMultiQuery(Query baseQuery, List<MultiQueryBuilder> queryBuilders, Transaction txn,
68 QueryRunner queryRunner) {
69 checkArgument(!queryBuilders.isEmpty());
70 checkArgument(baseQuery.getFilter() == null);
71 checkArgument(baseQuery.getFilterPredicates().isEmpty());
72 this.txn = txn;
73 this.baseQuery = baseQuery;
74 this.queryBuilders = queryBuilders;
75 this.queryRunner = queryRunner;
77 if (baseQuery.getProjections().isEmpty()) {
78 projected = Collections.emptySet();
79 } else {
80 projected = Sets.newHashSet();
81 for (Projection proj : baseQuery.getProjections()) {
82 projected.add(proj.getPropertyName());
84 if (!baseQuery.getSortPredicates().isEmpty()) {
85 Set<String> localProjected = Sets.newHashSet(projected);
86 for (SortPredicate sort : baseQuery.getSortPredicates()) {
87 if (localProjected.add(sort.getPropertyName())) {
88 baseQuery.addProjection(new PropertyProjection(sort.getPropertyName(), null));
94 if (queryBuilders.size() > 1 || queryBuilders.get(0).getParallelQuerySize() > 1) {
95 if (baseQuery.isKeysOnly()) {
96 for (SortPredicate sp : baseQuery.getSortPredicates()) {
97 if (!sp.getPropertyName().equals(Entity.KEY_RESERVED_PROPERTY)) {
98 throw new IllegalArgumentException(
99 "The provided keys-only multi-query needs to perform some "
100 + "sorting in memory. As a result, this query can only be "
101 + "sorted by the key property as this is the only property "
102 + "that is available in memory.");
106 List<SortPredicate> sortPredicates = baseQuery.getSortPredicates();
107 List<Order> orders = new ArrayList<>(sortPredicates.size());
108 for (SortPredicate sp : sortPredicates) {
109 orders.add(QueryTranslator.convertSortPredicateToPb(sp));
111 this.entityComparator = new EntityComparator(orders);
112 } else {
113 this.entityComparator = null;
116 maxBufferedIteratorsPerBuilder = new int[queryBuilders.size()];
117 int allocatableQueries = MAX_BUFFERED_QUERIES;
119 for (int i = 0; i < queryBuilders.size(); i++) {
120 ++maxBufferedIteratorsPerBuilder[i];
121 allocatableQueries -= queryBuilders.get(i).getParallelQuerySize();
124 boolean madeEmptyPass = false;
125 while (allocatableQueries > 0 && !madeEmptyPass) {
126 madeEmptyPass = true;
127 for (int i = 0; i < queryBuilders.size(); i++) {
128 if (queryBuilders.get(i).getParallelQuerySize() <= allocatableQueries) {
129 ++maxBufferedIteratorsPerBuilder[i];
130 allocatableQueries -= queryBuilders.get(i).getParallelQuerySize();
131 madeEmptyPass = false;
137 protected PreparedQuery prepareQuery(List<FilterPredicate> filters, boolean isCountQuery) {
138 Query query = new Query(baseQuery);
139 if (isCountQuery && query.getProjections().isEmpty()) {
140 query.setKeysOnly();
143 query.getFilterPredicates().addAll(filters);
144 return new PreparedQueryImpl(query, txn, queryRunner);
147 protected Object getDedupeValue(Entity entity) {
148 if (projected.isEmpty()) {
149 return KeyAndProperties.create(entity.getKey(), null);
150 } else {
151 return KeyAndProperties.create(entity.getKey(), entity.getProperties());
155 @AutoValue
156 abstract static class KeyAndProperties {
157 static KeyAndProperties create(Key key, @Nullable Map<String, Object> properties) {
158 return new AutoValue_PreparedMultiQuery_KeyAndProperties(key, properties);
161 abstract Key key();
163 @Nullable
164 abstract Map<String, Object> properties();
168 * A helper function to prepare batches of queries.
169 * @param filtersList list of the filters for each query to prepare
170 * @return a list of prepared queries
172 protected List<PreparedQuery> prepareQueries(List<List<FilterPredicate>> filtersList) {
173 List<PreparedQuery> preparedQueries = new ArrayList<PreparedQuery>(filtersList.size());
174 for (List<FilterPredicate> filters : filtersList) {
175 preparedQueries.add(prepareQuery(filters, false));
177 return preparedQueries;
181 * An iterator that will correctly process the values returned by a multiquery iterator.
183 * This iterator in some cases may not respect the provided FetchOptions.limit().
185 private class FilteredMultiQueryIterator extends AbstractIterator<Entity> {
186 private final Iterator<List<List<FilterPredicate>>> multiQueryIterator;
187 private final FetchOptions fetchOptions;
188 private final Set<Object> seenUniqueValues;
190 private Iterator<Entity> currentIterator = Collections.emptyIterator();
191 private Queue<Iterator<Entity>> queryIterBuffer;
193 public FilteredMultiQueryIterator(MultiQueryBuilder queryBuilder, FetchOptions fetchOptions,
194 Set<Object> seenUniqueValues, int numIteratorsToBuffer) {
195 this.multiQueryIterator = queryBuilder.iterator();
196 this.queryIterBuffer = new ArrayDeque<Iterator<Entity>>(numIteratorsToBuffer);
197 this.fetchOptions = fetchOptions;
198 this.seenUniqueValues = seenUniqueValues;
200 while (queryIterBuffer.size() < numIteratorsToBuffer && multiQueryIterator.hasNext()) {
201 queryIterBuffer.add(makeQueryIterator());
206 * Get the iterator for the next source query. queryIterBuffer is already filled by
207 * the {@link FilteredMultiQueryIterator} constructor.
208 * We try to refill any slots in queryIterBuffer that we free up this way to pre-warm
209 * the next query/queries.
210 * @return iterator for the next source that has results or null if there isn't another source.
212 protected Iterator<Entity> getNextIterator() {
213 while (!queryIterBuffer.isEmpty()) {
214 Iterator<Entity> result = queryIterBuffer.remove();
215 if (multiQueryIterator.hasNext()) {
216 queryIterBuffer.add(makeQueryIterator());
218 if (result.hasNext()) {
219 return result;
222 return null;
226 * Create a iterator on a source query, either directly from a single query if possible or
227 * by wrapping multiple queries that need to be mergesorted inside of a {@link HeapIterator}.
228 * @return an iterator on the next {@link MultiQueryBuilder} from queryBuilders.
230 private Iterator<Entity> makeQueryIterator() {
231 List<PreparedQuery> queries = prepareQueries(multiQueryIterator.next());
232 if (queries.size() == 1) {
233 return queries.get(0).asIterator(fetchOptions);
234 } else {
235 return makeHeapIterator(Iterables.transform(queries,
236 new Function<PreparedQuery, Iterator<Entity>>() {
237 @Override
238 public Iterator<Entity> apply(PreparedQuery input) {
239 return input.asIterator(fetchOptions);
241 }));
245 @Override
246 protected Entity computeNext() {
247 Entity result = null;
248 do {
249 if (!currentIterator.hasNext()) {
250 currentIterator = getNextIterator();
251 if (currentIterator == null) {
252 return endOfData();
255 result = currentIterator.next();
256 } while (!seenUniqueValues.add(getDedupeValue(result)));
258 if (!projected.isEmpty()) {
259 for (String prop : result.getProperties().keySet()) {
260 if (!projected.contains(prop)) {
261 result.removeProperty(prop);
265 return result;
269 static final class HeapIterator extends AbstractIterator<Entity> {
270 private final PriorityQueue<EntitySource> heap;
272 HeapIterator(PriorityQueue<EntitySource> heap) {
273 this.heap = heap;
276 @Override
277 protected Entity computeNext() {
278 Entity result;
279 result = nextResult(heap);
280 if (result == null) {
281 endOfData();
283 return result;
287 Iterator<Entity> makeHeapIterator(Iterable<Iterator<Entity>> iterators) {
288 final PriorityQueue<EntitySource> heap = new PriorityQueue<EntitySource>();
289 for (Iterator<Entity> iter : iterators) {
290 if (iter.hasNext()) {
291 heap.add(new EntitySource(entityComparator, iter));
294 return new HeapIterator(heap);
298 * Fetch the next result from the {@link PriorityQueue} and reset the
299 * datasource from which the next result was taken.
301 static Entity nextResult(PriorityQueue<EntitySource> availableEntitySources) {
302 EntitySource current = availableEntitySources.poll();
303 if (current == null) {
304 return null;
306 Entity result = current.currentEntity;
307 current.advance();
308 if (current.currentEntity != null) {
309 availableEntitySources.add(current);
310 } else {
312 return result;
316 * Data structure that we use in conjunction with the {@link PriorityQueue}.
317 * It always compares using its {@code currentEntity} field by delegating to
318 * its {@code entityComparator}.
320 static final class EntitySource implements Comparable<EntitySource> {
321 private final EntityComparator entityComparator;
322 private final Iterator<Entity> source;
323 private Entity currentEntity;
325 EntitySource(EntityComparator entityComparator, Iterator<Entity> source) {
326 this.entityComparator = entityComparator;
327 this.source = source;
328 if (!source.hasNext()) {
329 throw new IllegalArgumentException("Source iterator has no data.");
331 this.currentEntity = source.next();
334 private void advance() {
335 currentEntity = source.hasNext() ? source.next() : null;
338 @Override
339 public int compareTo(EntitySource entitySource) {
340 return entityComparator.compare(currentEntity, entitySource.currentEntity);
344 @Override
345 public Entity asSingleEntity() throws TooManyResultsException {
346 List<Entity> result = this.asList(FetchOptions.Builder.withLimit(2));
347 if (result.size() == 1) {
348 return result.get(0);
349 } else if (result.size() > 1) {
350 throw new TooManyResultsException();
351 } else {
352 return null;
356 @Override
357 public int countEntities(FetchOptions fetchOptions) {
358 FetchOptions overrideOptions = new FetchOptions(fetchOptions);
359 overrideOptions.chunkSize(Integer.MAX_VALUE);
360 if (fetchOptions.getOffset() != null) {
361 overrideOptions.clearOffset();
362 if (fetchOptions.getLimit() != null) {
363 int adjustedLimit = fetchOptions.getOffset() + fetchOptions.getLimit();
364 if (adjustedLimit < 0) {
365 overrideOptions.clearLimit();
366 } else {
367 overrideOptions.limit(adjustedLimit);
372 Set<Object> seen = Sets.newHashSet();
374 outer:
375 for (MultiQueryBuilder queryBuilder : queryBuilders) {
376 for (List<List<FilterPredicate>> filtersList : queryBuilder) {
377 for (List<FilterPredicate> filters : filtersList) {
378 PreparedQuery preparedQuery = prepareQuery(filters, true);
379 Query query = new Query(baseQuery);
380 if (query.getProjections().isEmpty()) {
381 query.setKeysOnly();
383 for (Entity entity : preparedQuery.asIterable(overrideOptions)) {
384 if (seen.add(getDedupeValue(entity)) && overrideOptions.getLimit() != null
385 && seen.size() >= overrideOptions.getLimit()) {
386 break outer;
392 return fetchOptions.getOffset() == null
393 ? seen.size() : Math.max(0, seen.size() - fetchOptions.getOffset());
396 @Override
397 public Iterator<Entity> asIterator(FetchOptions fetchOptions) {
399 if ((fetchOptions.getOffset() != null && fetchOptions.getOffset() > 0)
400 || fetchOptions.getLimit() != null) {
401 FetchOptions override = new FetchOptions(fetchOptions);
402 if (fetchOptions.getOffset() != null) {
403 override.clearOffset();
404 if (fetchOptions.getLimit() != null) {
405 int adjustedLimit = fetchOptions.getOffset() + fetchOptions.getLimit();
406 if (adjustedLimit < 0) {
407 override.clearLimit();
408 } else {
409 override.limit(adjustedLimit);
413 return new SlicingIterator<Entity>(
414 newFilteredMultiQueryIterator(override),
415 fetchOptions.getOffset(),
416 fetchOptions.getLimit());
417 } else {
418 return newFilteredMultiQueryIterator(fetchOptions);
422 private Iterator<Entity> newFilteredMultiQueryIterator(FetchOptions fetchOptions) {
423 Set<Object> dedupeSet = Sets.newHashSet();
424 if (queryBuilders.size() == 1) {
425 return new FilteredMultiQueryIterator(queryBuilders.get(0), fetchOptions, dedupeSet,
426 maxBufferedIteratorsPerBuilder[0]);
428 List<Iterator<Entity>> iterators = Lists.newArrayListWithCapacity(queryBuilders.size());
429 for (int i = 0; i < queryBuilders.size(); i++) {
430 iterators.add(new FilteredMultiQueryIterator(queryBuilders.get(i), fetchOptions, dedupeSet,
431 maxBufferedIteratorsPerBuilder[i]));
433 return makeHeapIterator(iterators);
436 @Override
437 public List<Entity> asList(FetchOptions fetchOptions) {
438 FetchOptions override = new FetchOptions(fetchOptions);
439 if (override.getChunkSize() == null) {
440 override.chunkSize(Integer.MAX_VALUE);
443 List<Entity> results = new ArrayList<Entity>();
444 Iterables.addAll(results, asIterable(override));
445 return results;
448 private static class NullQueryResult implements QueryResult {
450 public static final NullQueryResult INSTANCE = new NullQueryResult();
452 @Override
453 public List<Index> getIndexList() {
454 return null;
457 @Override
458 public Cursor getCursor() {
459 return null;
464 @Override
465 public QueryResultIterator<Entity> asQueryResultIterator(FetchOptions fetchOptions) {
466 return new QueryResultIteratorDelegator<Entity>(new NullQueryResult(),
467 asIterator(fetchOptions));
470 @Override
471 public QueryResultList<Entity> asQueryResultList(FetchOptions fetchOptions) {
472 return new QueryResultListDelegator<Entity>(NullQueryResult.INSTANCE,
473 asList(fetchOptions));