1 // Copyright 2009 Google Inc. All Rights Reserved.
3 package com
.google
.appengine
.api
.datastore
;
5 import static com
.google
.common
.base
.Preconditions
.checkArgument
;
7 import com
.google
.appengine
.api
.datastore
.Query
.FilterPredicate
;
8 import com
.google
.appengine
.api
.datastore
.Query
.SortPredicate
;
9 import com
.google
.apphosting
.api
.ApiProxy
.ApiConfig
;
10 import com
.google
.apphosting
.datastore
.DatastoreV3Pb
.Query
.Order
;
11 import com
.google
.common
.base
.Function
;
12 import com
.google
.common
.base
.Pair
;
13 import com
.google
.common
.collect
.Iterables
;
14 import com
.google
.common
.collect
.Lists
;
15 import com
.google
.common
.collect
.Sets
;
17 import java
.util
.ArrayDeque
;
18 import java
.util
.ArrayList
;
19 import java
.util
.Collections
;
20 import java
.util
.Comparator
;
21 import java
.util
.Iterator
;
22 import java
.util
.List
;
23 import java
.util
.PriorityQueue
;
24 import java
.util
.Queue
;
28 * A {@link PreparedQuery} implementation for use with {@link MultiQueryBuilder}.
30 * We run each successively generated list of filters returned by each
31 * {@link MultiQueryBuilder} as they are needed and concatenate the result.
33 * If a list of filters contains more than one entry or there are multiple
34 * {@link MultiQueryBuilder}s we build a {@link Comparator} based on the sort
35 * predicates of the base query. We then use this {@link Comparator} to produce
36 * an appropriately ordered sequence of results that contains the results from
37 * each sub-query. As each sub-query produces results that are already sorted
38 * we simply use a {@link PriorityQueue} to merge the results from the sub-query
39 * as new results are requested.
42 class PreparedMultiQuery
extends BasePreparedQuery
{
43 static final int MAX_BUFFERED_QUERIES
= 10;
45 private final ApiConfig apiConfig
;
46 private final DatastoreServiceConfig datastoreServiceConfig
;
47 private final Query baseQuery
;
48 private final List
<MultiQueryBuilder
> queryBuilders
;
49 private final EntityComparator entityComparator
;
50 private final Transaction txn
;
51 private final QueryRunner queryRunner
;
52 private final Set
<String
> projected
;
54 private final int[] maxBufferedIteratorsPerBuilder
;
57 * @param apiConfig the api config to use
58 * @param datastoreServiceConfig the datastore service config to use
59 * @param baseQuery the base query on which to apply generate filters filters
60 * @param queryBuilders the source of filters to use
61 * @param txn the txn in which all queries should execute, can be {@code null}
63 * @throws IllegalArgumentException if this multi-query required in memory
64 * sorting and the base query is both a keys-only query and sorted by anything
67 PreparedMultiQuery(ApiConfig apiConfig
, DatastoreServiceConfig datastoreServiceConfig
,
68 Query baseQuery
, List
<MultiQueryBuilder
> queryBuilders
, Transaction txn
,
69 QueryRunner queryRunner
) {
70 checkArgument(!queryBuilders
.isEmpty());
71 checkArgument(baseQuery
.getFilter() == null);
72 checkArgument(baseQuery
.getFilterPredicates().isEmpty());
73 this.apiConfig
= apiConfig
;
74 this.datastoreServiceConfig
= datastoreServiceConfig
;
76 this.baseQuery
= baseQuery
;
77 this.queryBuilders
= queryBuilders
;
78 this.queryRunner
= queryRunner
;
80 if (baseQuery
.getProjections().isEmpty()) {
81 projected
= Collections
.emptySet();
83 projected
= Sets
.newHashSet();
84 for (Projection proj
: baseQuery
.getProjections()) {
85 projected
.add(proj
.getPropertyName());
87 if (!baseQuery
.getSortPredicates().isEmpty()) {
88 Set
<String
> localProjected
= Sets
.newHashSet(projected
);
89 for (SortPredicate sort
: baseQuery
.getSortPredicates()) {
90 if (localProjected
.add(sort
.getPropertyName())) {
91 baseQuery
.addProjection(new PropertyProjection(sort
.getPropertyName(), null));
97 if (queryBuilders
.size() > 1 || queryBuilders
.get(0).getParallelQuerySize() > 1) {
98 if (baseQuery
.isKeysOnly()) {
99 for (SortPredicate sp
: baseQuery
.getSortPredicates()) {
100 if (!sp
.getPropertyName().equals(Entity
.KEY_RESERVED_PROPERTY
)) {
101 throw new IllegalArgumentException(
102 "The provided keys-only multi-query needs to perform some "
103 + "sorting in memory. As a result, this query can only be "
104 + "sorted by the key property as this is the only property "
105 + "that is available in memory.");
109 List
<SortPredicate
> sortPredicates
= baseQuery
.getSortPredicates();
110 List
<Order
> orders
= new ArrayList
<>(sortPredicates
.size());
111 for (SortPredicate sp
: sortPredicates
) {
112 orders
.add(QueryTranslator
.convertSortPredicateToPb(sp
));
114 this.entityComparator
= new EntityComparator(orders
);
116 this.entityComparator
= null;
119 maxBufferedIteratorsPerBuilder
= new int[queryBuilders
.size()];
120 int allocatableQueries
= MAX_BUFFERED_QUERIES
;
122 for (int i
= 0; i
< queryBuilders
.size(); i
++) {
123 ++maxBufferedIteratorsPerBuilder
[i
];
124 allocatableQueries
-= queryBuilders
.get(i
).getParallelQuerySize();
127 boolean madeEmptyPass
= false;
128 while (allocatableQueries
> 0 && !madeEmptyPass
) {
129 madeEmptyPass
= true;
130 for (int i
= 0; i
< queryBuilders
.size(); i
++) {
131 if (queryBuilders
.get(i
).getParallelQuerySize() <= allocatableQueries
) {
132 ++maxBufferedIteratorsPerBuilder
[i
];
133 allocatableQueries
-= queryBuilders
.get(i
).getParallelQuerySize();
134 madeEmptyPass
= false;
140 protected PreparedQuery
prepareQuery(List
<FilterPredicate
> filters
, boolean isCountQuery
) {
141 Query query
= new Query(baseQuery
);
142 if (isCountQuery
&& query
.getProjections().isEmpty()) {
146 query
.getFilterPredicates().addAll(filters
);
147 return new PreparedQueryImpl(apiConfig
, datastoreServiceConfig
, query
, txn
, queryRunner
);
150 protected Object
getDedupeValue(Entity entity
) {
151 if (projected
.isEmpty()) {
152 return entity
.getKey();
154 return Pair
.of(entity
.getKey(), entity
.getProperties());
159 * A helper function to prepare batches of queries.
160 * @param filtersList list of the filters for each query to prepare
161 * @return a list of prepared queries
163 protected List
<PreparedQuery
> prepareQueries(List
<List
<FilterPredicate
>> filtersList
) {
164 List
<PreparedQuery
> preparedQueries
= new ArrayList
<PreparedQuery
>(filtersList
.size());
165 for (List
<FilterPredicate
> filters
: filtersList
) {
166 preparedQueries
.add(prepareQuery(filters
, false));
168 return preparedQueries
;
172 * An iterator that will correctly process the values returned by a multiquery iterator.
174 * This iterator in some cases may not respect the provided FetchOptions.limit().
176 private class FilteredMultiQueryIterator
extends AbstractIterator
<Entity
> {
177 private final Iterator
<List
<List
<FilterPredicate
>>> multiQueryIterator
;
178 private final FetchOptions fetchOptions
;
179 private final Set
<Object
> seenUniqueValues
;
181 private Iterator
<Entity
> currentIterator
= Collections
.emptyIterator();
182 private Queue
<Iterator
<Entity
>> queryIterBuffer
;
184 public FilteredMultiQueryIterator(MultiQueryBuilder queryBuilder
, FetchOptions fetchOptions
,
185 Set
<Object
> seenUniqueValues
, int numIteratorsToBuffer
) {
186 this.multiQueryIterator
= queryBuilder
.iterator();
187 this.queryIterBuffer
= new ArrayDeque
<Iterator
<Entity
>>(numIteratorsToBuffer
);
188 this.fetchOptions
= fetchOptions
;
189 this.seenUniqueValues
= seenUniqueValues
;
191 while (queryIterBuffer
.size() < numIteratorsToBuffer
&& multiQueryIterator
.hasNext()) {
192 queryIterBuffer
.add(makeQueryIterator());
197 * Get the iterator for the next source query. queryIterBuffer is already filled by
198 * the {@link FilteredMultiQueryIterator} constructor.
199 * We try to refill any slots in queryIterBuffer that we free up this way to pre-warm
200 * the next query/queries.
201 * @return iterator for the next source that has results or null if there isn't another source.
203 protected Iterator
<Entity
> getNextIterator() {
204 while (!queryIterBuffer
.isEmpty()) {
205 Iterator
<Entity
> result
= queryIterBuffer
.remove();
206 if (multiQueryIterator
.hasNext()) {
207 queryIterBuffer
.add(makeQueryIterator());
209 if (result
.hasNext()) {
217 * Create a iterator on a source query, either directly from a single query if possible or
218 * by wrapping multiple queries that need to be mergesorted inside of a {@link HeapIterator}.
219 * @return an iterator on the next {@link MultiQueryBuilder} from queryBuilders.
221 private Iterator
<Entity
> makeQueryIterator() {
222 List
<PreparedQuery
> queries
= prepareQueries(multiQueryIterator
.next());
223 if (queries
.size() == 1) {
224 return queries
.get(0).asIterator(fetchOptions
);
226 return makeHeapIterator(Iterables
.transform(queries
,
227 new Function
<PreparedQuery
, Iterator
<Entity
>>() {
229 public Iterator
<Entity
> apply(PreparedQuery input
) {
230 return input
.asIterator(fetchOptions
);
237 protected Entity
computeNext() {
238 Entity result
= null;
240 if (!currentIterator
.hasNext()) {
241 currentIterator
= getNextIterator();
242 if (currentIterator
== null) {
246 result
= currentIterator
.next();
247 } while (!seenUniqueValues
.add(getDedupeValue(result
)));
249 if (!projected
.isEmpty()) {
250 for (String prop
: result
.getProperties().keySet()) {
251 if (!projected
.contains(prop
)) {
252 result
.removeProperty(prop
);
260 static final class HeapIterator
extends AbstractIterator
<Entity
> {
261 private final PriorityQueue
<EntitySource
> heap
;
263 HeapIterator(PriorityQueue
<EntitySource
> heap
) {
268 protected Entity
computeNext() {
270 result
= nextResult(heap
);
271 if (result
== null) {
278 Iterator
<Entity
> makeHeapIterator(Iterable
<Iterator
<Entity
>> iterators
) {
279 final PriorityQueue
<EntitySource
> heap
= new PriorityQueue
<EntitySource
>();
280 for (Iterator
<Entity
> iter
: iterators
) {
281 if (iter
.hasNext()) {
282 heap
.add(new EntitySource(entityComparator
, iter
));
285 return new HeapIterator(heap
);
289 * Fetch the next result from the {@link PriorityQueue} and reset the
290 * datasource from which the next result was taken.
292 static Entity
nextResult(PriorityQueue
<EntitySource
> availableEntitySources
) {
293 EntitySource current
= availableEntitySources
.poll();
294 if (current
== null) {
297 Entity result
= current
.currentEntity
;
299 if (current
.currentEntity
!= null) {
300 availableEntitySources
.add(current
);
307 * Data structure that we use in conjunction with the {@link PriorityQueue}.
308 * It always compares using its {@code currentEntity} field by delegating to
309 * its {@code entityComparator}.
311 static final class EntitySource
implements Comparable
<EntitySource
> {
312 private final EntityComparator entityComparator
;
313 private final Iterator
<Entity
> source
;
314 private Entity currentEntity
;
316 EntitySource(EntityComparator entityComparator
, Iterator
<Entity
> source
) {
317 this.entityComparator
= entityComparator
;
318 this.source
= source
;
319 if (!source
.hasNext()) {
320 throw new IllegalArgumentException("Source iterator has no data.");
322 this.currentEntity
= source
.next();
325 private void advance() {
326 currentEntity
= source
.hasNext() ? source
.next() : null;
330 public int compareTo(EntitySource entitySource
) {
331 return entityComparator
.compare(currentEntity
, entitySource
.currentEntity
);
336 public Entity
asSingleEntity() throws TooManyResultsException
{
337 List
<Entity
> result
= this.asList(FetchOptions
.Builder
.withLimit(2));
338 if (result
.size() == 1) {
339 return result
.get(0);
340 } else if (result
.size() > 1) {
341 throw new TooManyResultsException();
348 public int countEntities(FetchOptions fetchOptions
) {
349 FetchOptions overrideOptions
= new FetchOptions(fetchOptions
);
350 overrideOptions
.chunkSize(Integer
.MAX_VALUE
);
351 if (fetchOptions
.getOffset() != null) {
352 overrideOptions
.clearOffset();
353 if (fetchOptions
.getLimit() != null) {
354 int adjustedLimit
= fetchOptions
.getOffset() + fetchOptions
.getLimit();
355 if (adjustedLimit
< 0) {
356 overrideOptions
.clearLimit();
358 overrideOptions
.limit(adjustedLimit
);
363 Set
<Object
> seen
= Sets
.newHashSet();
366 for (MultiQueryBuilder queryBuilder
: queryBuilders
) {
367 for (List
<List
<FilterPredicate
>> filtersList
: queryBuilder
) {
368 for (List
<FilterPredicate
> filters
: filtersList
) {
369 PreparedQuery preparedQuery
= prepareQuery(filters
, true);
370 Query query
= new Query(baseQuery
);
371 if (query
.getProjections().isEmpty()) {
374 for (Entity entity
: preparedQuery
.asIterable(overrideOptions
)) {
375 if (seen
.add(getDedupeValue(entity
)) && overrideOptions
.getLimit() != null
376 && seen
.size() >= overrideOptions
.getLimit()) {
383 return fetchOptions
.getOffset() == null
384 ? seen
.size() : Math
.max(0, seen
.size() - fetchOptions
.getOffset());
388 public Iterator
<Entity
> asIterator(FetchOptions fetchOptions
) {
390 if ((fetchOptions
.getOffset() != null && fetchOptions
.getOffset() > 0)
391 || fetchOptions
.getLimit() != null) {
392 FetchOptions override
= new FetchOptions(fetchOptions
);
393 if (fetchOptions
.getOffset() != null) {
394 override
.clearOffset();
395 if (fetchOptions
.getLimit() != null) {
396 int adjustedLimit
= fetchOptions
.getOffset() + fetchOptions
.getLimit();
397 if (adjustedLimit
< 0) {
398 override
.clearLimit();
400 override
.limit(adjustedLimit
);
404 return new SlicingIterator
<Entity
>(
405 newFilteredMultiQueryIterator(override
),
406 fetchOptions
.getOffset(),
407 fetchOptions
.getLimit());
409 return newFilteredMultiQueryIterator(fetchOptions
);
413 private Iterator
<Entity
> newFilteredMultiQueryIterator(FetchOptions fetchOptions
) {
414 Set
<Object
> dedupeSet
= Sets
.newHashSet();
415 if (queryBuilders
.size() == 1) {
416 return new FilteredMultiQueryIterator(queryBuilders
.get(0), fetchOptions
, dedupeSet
,
417 maxBufferedIteratorsPerBuilder
[0]);
419 List
<Iterator
<Entity
>> iterators
= Lists
.newArrayListWithCapacity(queryBuilders
.size());
420 for (int i
= 0; i
< queryBuilders
.size(); i
++) {
421 iterators
.add(new FilteredMultiQueryIterator(queryBuilders
.get(i
), fetchOptions
, dedupeSet
,
422 maxBufferedIteratorsPerBuilder
[i
]));
424 return makeHeapIterator(iterators
);
428 public List
<Entity
> asList(FetchOptions fetchOptions
) {
429 FetchOptions override
= new FetchOptions(fetchOptions
);
430 if (override
.getChunkSize() == null) {
431 override
.chunkSize(Integer
.MAX_VALUE
);
434 List
<Entity
> results
= new ArrayList
<Entity
>();
435 Iterables
.addAll(results
, asIterable(override
));
439 private static class NullQueryResult
implements QueryResult
{
441 public static final NullQueryResult INSTANCE
= new NullQueryResult();
444 public List
<Index
> getIndexList() {
449 public Cursor
getCursor() {
456 public QueryResultIterator
<Entity
> asQueryResultIterator(FetchOptions fetchOptions
) {
457 return new QueryResultIteratorDelegator
<Entity
>(new NullQueryResult(),
458 asIterator(fetchOptions
));
462 public QueryResultList
<Entity
> asQueryResultList(FetchOptions fetchOptions
) {
463 return new QueryResultListDelegator
<Entity
>(NullQueryResult
.INSTANCE
,
464 asList(fetchOptions
));