Revision created by MOE tool push_codebase.
[gae.git] / java / src / main / com / google / appengine / api / datastore / PreparedMultiQuery.java
blobabc2798983df92e676e5f568daf07566956f57dc
1 // Copyright 2009 Google Inc. All Rights Reserved.
3 package com.google.appengine.api.datastore;
5 import static com.google.common.base.Preconditions.checkArgument;
7 import com.google.appengine.api.datastore.Query.FilterPredicate;
8 import com.google.appengine.api.datastore.Query.SortPredicate;
9 import com.google.apphosting.api.ApiProxy.ApiConfig;
10 import com.google.apphosting.datastore.DatastoreV3Pb.Query.Order;
11 import com.google.common.base.Function;
12 import com.google.common.base.Pair;
13 import com.google.common.collect.Iterables;
14 import com.google.common.collect.Lists;
15 import com.google.common.collect.Sets;
17 import java.util.ArrayDeque;
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.Comparator;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.PriorityQueue;
24 import java.util.Queue;
25 import java.util.Set;
27 /**
28 * A {@link PreparedQuery} implementation for use with {@link MultiQueryBuilder}.
30 * We run each successively generated list of filters returned by each
31 * {@link MultiQueryBuilder} as they are needed and concatenate the result.
33 * If a list of filters contains more than one entry or there are multiple
34 * {@link MultiQueryBuilder}s we build a {@link Comparator} based on the sort
35 * predicates of the base query. We then use this {@link Comparator} to produce
36 * an appropriately ordered sequence of results that contains the results from
37 * each sub-query. As each sub-query produces results that are already sorted
38 * we simply use a {@link PriorityQueue} to merge the results from the sub-query
39 * as new results are requested.
42 class PreparedMultiQuery extends BasePreparedQuery {
43 static final int MAX_BUFFERED_QUERIES = 10;
45 private final ApiConfig apiConfig;
46 private final DatastoreServiceConfig datastoreServiceConfig;
47 private final Query baseQuery;
48 private final List<MultiQueryBuilder> queryBuilders;
49 private final EntityComparator entityComparator;
50 private final Transaction txn;
51 private final QueryRunner queryRunner;
52 private final Set<String> projected;
54 private final int[] maxBufferedIteratorsPerBuilder;
56 /**
57 * @param apiConfig the api config to use
58 * @param datastoreServiceConfig the datastore service config to use
59 * @param baseQuery the base query on which to apply generate filters filters
60 * @param queryBuilders the source of filters to use
61 * @param txn the txn in which all queries should execute, can be {@code null}
63 * @throws IllegalArgumentException if this multi-query required in memory
64 * sorting and the base query is both a keys-only query and sorted by anything
65 * other than its key.
67 PreparedMultiQuery(ApiConfig apiConfig, DatastoreServiceConfig datastoreServiceConfig,
68 Query baseQuery, List<MultiQueryBuilder> queryBuilders, Transaction txn,
69 QueryRunner queryRunner) {
70 checkArgument(!queryBuilders.isEmpty());
71 checkArgument(baseQuery.getFilter() == null);
72 checkArgument(baseQuery.getFilterPredicates().isEmpty());
73 this.apiConfig = apiConfig;
74 this.datastoreServiceConfig = datastoreServiceConfig;
75 this.txn = txn;
76 this.baseQuery = baseQuery;
77 this.queryBuilders = queryBuilders;
78 this.queryRunner = queryRunner;
80 if (baseQuery.getProjections().isEmpty()) {
81 projected = Collections.emptySet();
82 } else {
83 projected = Sets.newHashSet();
84 for (Projection proj : baseQuery.getProjections()) {
85 projected.add(proj.getPropertyName());
87 if (!baseQuery.getSortPredicates().isEmpty()) {
88 Set<String> localProjected = Sets.newHashSet(projected);
89 for (SortPredicate sort : baseQuery.getSortPredicates()) {
90 if (localProjected.add(sort.getPropertyName())) {
91 baseQuery.addProjection(new PropertyProjection(sort.getPropertyName(), null));
97 if (queryBuilders.size() > 1 || queryBuilders.get(0).getParallelQuerySize() > 1) {
98 if (baseQuery.isKeysOnly()) {
99 for (SortPredicate sp : baseQuery.getSortPredicates()) {
100 if (!sp.getPropertyName().equals(Entity.KEY_RESERVED_PROPERTY)) {
101 throw new IllegalArgumentException(
102 "The provided keys-only multi-query needs to perform some "
103 + "sorting in memory. As a result, this query can only be "
104 + "sorted by the key property as this is the only property "
105 + "that is available in memory.");
109 List<SortPredicate> sortPredicates = baseQuery.getSortPredicates();
110 List<Order> orders = new ArrayList<>(sortPredicates.size());
111 for (SortPredicate sp : sortPredicates) {
112 orders.add(QueryTranslator.convertSortPredicateToPb(sp));
114 this.entityComparator = new EntityComparator(orders);
115 } else {
116 this.entityComparator = null;
119 maxBufferedIteratorsPerBuilder = new int[queryBuilders.size()];
120 int allocatableQueries = MAX_BUFFERED_QUERIES;
122 for (int i = 0; i < queryBuilders.size(); i++) {
123 ++maxBufferedIteratorsPerBuilder[i];
124 allocatableQueries -= queryBuilders.get(i).getParallelQuerySize();
127 boolean madeEmptyPass = false;
128 while (allocatableQueries > 0 && !madeEmptyPass) {
129 madeEmptyPass = true;
130 for (int i = 0; i < queryBuilders.size(); i++) {
131 if (queryBuilders.get(i).getParallelQuerySize() <= allocatableQueries) {
132 ++maxBufferedIteratorsPerBuilder[i];
133 allocatableQueries -= queryBuilders.get(i).getParallelQuerySize();
134 madeEmptyPass = false;
140 protected PreparedQuery prepareQuery(List<FilterPredicate> filters, boolean isCountQuery) {
141 Query query = new Query(baseQuery);
142 if (isCountQuery && query.getProjections().isEmpty()) {
143 query.setKeysOnly();
146 query.getFilterPredicates().addAll(filters);
147 return new PreparedQueryImpl(apiConfig, datastoreServiceConfig, query, txn, queryRunner);
150 protected Object getDedupeValue(Entity entity) {
151 if (projected.isEmpty()) {
152 return entity.getKey();
153 } else {
154 return Pair.of(entity.getKey(), entity.getProperties());
159 * A helper function to prepare batches of queries.
160 * @param filtersList list of the filters for each query to prepare
161 * @return a list of prepared queries
163 protected List<PreparedQuery> prepareQueries(List<List<FilterPredicate>> filtersList) {
164 List<PreparedQuery> preparedQueries = new ArrayList<PreparedQuery>(filtersList.size());
165 for (List<FilterPredicate> filters : filtersList) {
166 preparedQueries.add(prepareQuery(filters, false));
168 return preparedQueries;
172 * An iterator that will correctly process the values returned by a multiquery iterator.
174 * This iterator in some cases may not respect the provided FetchOptions.limit().
176 private class FilteredMultiQueryIterator extends AbstractIterator<Entity> {
177 private final Iterator<List<List<FilterPredicate>>> multiQueryIterator;
178 private final FetchOptions fetchOptions;
179 private final Set<Object> seenUniqueValues;
181 private Iterator<Entity> currentIterator = Collections.emptyIterator();
182 private Queue<Iterator<Entity>> queryIterBuffer;
184 public FilteredMultiQueryIterator(MultiQueryBuilder queryBuilder, FetchOptions fetchOptions,
185 Set<Object> seenUniqueValues, int numIteratorsToBuffer) {
186 this.multiQueryIterator = queryBuilder.iterator();
187 this.queryIterBuffer = new ArrayDeque<Iterator<Entity>>(numIteratorsToBuffer);
188 this.fetchOptions = fetchOptions;
189 this.seenUniqueValues = seenUniqueValues;
191 while (queryIterBuffer.size() < numIteratorsToBuffer && multiQueryIterator.hasNext()) {
192 queryIterBuffer.add(makeQueryIterator());
197 * Get the iterator for the next source query. queryIterBuffer is already filled by
198 * the {@link FilteredMultiQueryIterator} constructor.
199 * We try to refill any slots in queryIterBuffer that we free up this way to pre-warm
200 * the next query/queries.
201 * @return iterator for the next source that has results or null if there isn't another source.
203 protected Iterator<Entity> getNextIterator() {
204 while (!queryIterBuffer.isEmpty()) {
205 Iterator<Entity> result = queryIterBuffer.remove();
206 if (multiQueryIterator.hasNext()) {
207 queryIterBuffer.add(makeQueryIterator());
209 if (result.hasNext()) {
210 return result;
213 return null;
217 * Create a iterator on a source query, either directly from a single query if possible or
218 * by wrapping multiple queries that need to be mergesorted inside of a {@link HeapIterator}.
219 * @return an iterator on the next {@link MultiQueryBuilder} from queryBuilders.
221 private Iterator<Entity> makeQueryIterator() {
222 List<PreparedQuery> queries = prepareQueries(multiQueryIterator.next());
223 if (queries.size() == 1) {
224 return queries.get(0).asIterator(fetchOptions);
225 } else {
226 return makeHeapIterator(Iterables.transform(queries,
227 new Function<PreparedQuery, Iterator<Entity>>() {
228 @Override
229 public Iterator<Entity> apply(PreparedQuery input) {
230 return input.asIterator(fetchOptions);
232 }));
236 @Override
237 protected Entity computeNext() {
238 Entity result = null;
239 do {
240 if (!currentIterator.hasNext()) {
241 currentIterator = getNextIterator();
242 if (currentIterator == null) {
243 return endOfData();
246 result = currentIterator.next();
247 } while (!seenUniqueValues.add(getDedupeValue(result)));
249 if (!projected.isEmpty()) {
250 for (String prop : result.getProperties().keySet()) {
251 if (!projected.contains(prop)) {
252 result.removeProperty(prop);
256 return result;
260 static final class HeapIterator extends AbstractIterator<Entity> {
261 private final PriorityQueue<EntitySource> heap;
263 HeapIterator(PriorityQueue<EntitySource> heap) {
264 this.heap = heap;
267 @Override
268 protected Entity computeNext() {
269 Entity result;
270 result = nextResult(heap);
271 if (result == null) {
272 endOfData();
274 return result;
278 Iterator<Entity> makeHeapIterator(Iterable<Iterator<Entity>> iterators) {
279 final PriorityQueue<EntitySource> heap = new PriorityQueue<EntitySource>();
280 for (Iterator<Entity> iter : iterators) {
281 if (iter.hasNext()) {
282 heap.add(new EntitySource(entityComparator, iter));
285 return new HeapIterator(heap);
289 * Fetch the next result from the {@link PriorityQueue} and reset the
290 * datasource from which the next result was taken.
292 static Entity nextResult(PriorityQueue<EntitySource> availableEntitySources) {
293 EntitySource current = availableEntitySources.poll();
294 if (current == null) {
295 return null;
297 Entity result = current.currentEntity;
298 current.advance();
299 if (current.currentEntity != null) {
300 availableEntitySources.add(current);
301 } else {
303 return result;
307 * Data structure that we use in conjunction with the {@link PriorityQueue}.
308 * It always compares using its {@code currentEntity} field by delegating to
309 * its {@code entityComparator}.
311 static final class EntitySource implements Comparable<EntitySource> {
312 private final EntityComparator entityComparator;
313 private final Iterator<Entity> source;
314 private Entity currentEntity;
316 EntitySource(EntityComparator entityComparator, Iterator<Entity> source) {
317 this.entityComparator = entityComparator;
318 this.source = source;
319 if (!source.hasNext()) {
320 throw new IllegalArgumentException("Source iterator has no data.");
322 this.currentEntity = source.next();
325 private void advance() {
326 currentEntity = source.hasNext() ? source.next() : null;
329 @Override
330 public int compareTo(EntitySource entitySource) {
331 return entityComparator.compare(currentEntity, entitySource.currentEntity);
335 @Override
336 public Entity asSingleEntity() throws TooManyResultsException {
337 List<Entity> result = this.asList(FetchOptions.Builder.withLimit(2));
338 if (result.size() == 1) {
339 return result.get(0);
340 } else if (result.size() > 1) {
341 throw new TooManyResultsException();
342 } else {
343 return null;
347 @Override
348 public int countEntities(FetchOptions fetchOptions) {
349 FetchOptions overrideOptions = new FetchOptions(fetchOptions);
350 overrideOptions.chunkSize(Integer.MAX_VALUE);
351 if (fetchOptions.getOffset() != null) {
352 overrideOptions.clearOffset();
353 if (fetchOptions.getLimit() != null) {
354 int adjustedLimit = fetchOptions.getOffset() + fetchOptions.getLimit();
355 if (adjustedLimit < 0) {
356 overrideOptions.clearLimit();
357 } else {
358 overrideOptions.limit(adjustedLimit);
363 Set<Object> seen = Sets.newHashSet();
365 outer:
366 for (MultiQueryBuilder queryBuilder : queryBuilders) {
367 for (List<List<FilterPredicate>> filtersList : queryBuilder) {
368 for (List<FilterPredicate> filters : filtersList) {
369 PreparedQuery preparedQuery = prepareQuery(filters, true);
370 Query query = new Query(baseQuery);
371 if (query.getProjections().isEmpty()) {
372 query.setKeysOnly();
374 for (Entity entity : preparedQuery.asIterable(overrideOptions)) {
375 if (seen.add(getDedupeValue(entity)) && overrideOptions.getLimit() != null
376 && seen.size() >= overrideOptions.getLimit()) {
377 break outer;
383 return fetchOptions.getOffset() == null
384 ? seen.size() : Math.max(0, seen.size() - fetchOptions.getOffset());
387 @Override
388 public Iterator<Entity> asIterator(FetchOptions fetchOptions) {
390 if ((fetchOptions.getOffset() != null && fetchOptions.getOffset() > 0)
391 || fetchOptions.getLimit() != null) {
392 FetchOptions override = new FetchOptions(fetchOptions);
393 if (fetchOptions.getOffset() != null) {
394 override.clearOffset();
395 if (fetchOptions.getLimit() != null) {
396 int adjustedLimit = fetchOptions.getOffset() + fetchOptions.getLimit();
397 if (adjustedLimit < 0) {
398 override.clearLimit();
399 } else {
400 override.limit(adjustedLimit);
404 return new SlicingIterator<Entity>(
405 newFilteredMultiQueryIterator(override),
406 fetchOptions.getOffset(),
407 fetchOptions.getLimit());
408 } else {
409 return newFilteredMultiQueryIterator(fetchOptions);
413 private Iterator<Entity> newFilteredMultiQueryIterator(FetchOptions fetchOptions) {
414 Set<Object> dedupeSet = Sets.newHashSet();
415 if (queryBuilders.size() == 1) {
416 return new FilteredMultiQueryIterator(queryBuilders.get(0), fetchOptions, dedupeSet,
417 maxBufferedIteratorsPerBuilder[0]);
419 List<Iterator<Entity>> iterators = Lists.newArrayListWithCapacity(queryBuilders.size());
420 for (int i = 0; i < queryBuilders.size(); i++) {
421 iterators.add(new FilteredMultiQueryIterator(queryBuilders.get(i), fetchOptions, dedupeSet,
422 maxBufferedIteratorsPerBuilder[i]));
424 return makeHeapIterator(iterators);
427 @Override
428 public List<Entity> asList(FetchOptions fetchOptions) {
429 FetchOptions override = new FetchOptions(fetchOptions);
430 if (override.getChunkSize() == null) {
431 override.chunkSize(Integer.MAX_VALUE);
434 List<Entity> results = new ArrayList<Entity>();
435 Iterables.addAll(results, asIterable(override));
436 return results;
439 private static class NullQueryResult implements QueryResult {
441 public static final NullQueryResult INSTANCE = new NullQueryResult();
443 @Override
444 public List<Index> getIndexList() {
445 return null;
448 @Override
449 public Cursor getCursor() {
450 return null;
455 @Override
456 public QueryResultIterator<Entity> asQueryResultIterator(FetchOptions fetchOptions) {
457 return new QueryResultIteratorDelegator<Entity>(new NullQueryResult(),
458 asIterator(fetchOptions));
461 @Override
462 public QueryResultList<Entity> asQueryResultList(FetchOptions fetchOptions) {
463 return new QueryResultListDelegator<Entity>(NullQueryResult.INSTANCE,
464 asList(fetchOptions));