1 // Copyright 2009 Google Inc. All Rights Reserved.
3 package com
.google
.appengine
.api
.datastore
;
5 import static com
.google
.common
.base
.Preconditions
.checkArgument
;
7 import com
.google
.appengine
.api
.datastore
.Query
.FilterPredicate
;
8 import com
.google
.appengine
.api
.datastore
.Query
.SortPredicate
;
9 import com
.google
.apphosting
.datastore
.DatastoreV3Pb
.Query
.Order
;
10 import com
.google
.auto
.value
.AutoValue
;
11 import com
.google
.common
.base
.Function
;
12 import com
.google
.common
.collect
.Iterables
;
13 import com
.google
.common
.collect
.Lists
;
14 import com
.google
.common
.collect
.Sets
;
16 import java
.util
.ArrayDeque
;
17 import java
.util
.ArrayList
;
18 import java
.util
.Collections
;
19 import java
.util
.Comparator
;
20 import java
.util
.Iterator
;
21 import java
.util
.List
;
23 import java
.util
.PriorityQueue
;
24 import java
.util
.Queue
;
27 import javax
.annotation
.Nullable
;
30 * A {@link PreparedQuery} implementation for use with {@link MultiQueryBuilder}.
32 * We run each successively generated list of filters returned by each
33 * {@link MultiQueryBuilder} as they are needed and concatenate the result.
35 * If a list of filters contains more than one entry or there are multiple
36 * {@link MultiQueryBuilder}s we build a {@link Comparator} based on the sort
37 * predicates of the base query. We then use this {@link Comparator} to produce
38 * an appropriately ordered sequence of results that contains the results from
39 * each sub-query. As each sub-query produces results that are already sorted
40 * we simply use a {@link PriorityQueue} to merge the results from the sub-query
41 * as new results are requested.
44 class PreparedMultiQuery
extends BasePreparedQuery
{
45 static final int MAX_BUFFERED_QUERIES
= 10;
47 private final Query baseQuery
;
48 private final List
<MultiQueryBuilder
> queryBuilders
;
49 private final EntityComparator entityComparator
;
50 private final Transaction txn
;
51 private final QueryRunner queryRunner
;
52 private final Set
<String
> projected
;
54 private final int[] maxBufferedIteratorsPerBuilder
;
57 * @param apiConfig the api config to use
58 * @param datastoreServiceConfig the datastore service config to use
59 * @param baseQuery the base query on which to apply generate filters filters
60 * @param queryBuilders the source of filters to use
61 * @param txn the txn in which all queries should execute, can be {@code null}
63 * @throws IllegalArgumentException if this multi-query required in memory
64 * sorting and the base query is both a keys-only query and sorted by anything
67 PreparedMultiQuery(Query baseQuery
, List
<MultiQueryBuilder
> queryBuilders
, Transaction txn
,
68 QueryRunner queryRunner
) {
69 checkArgument(!queryBuilders
.isEmpty());
70 checkArgument(baseQuery
.getFilter() == null);
71 checkArgument(baseQuery
.getFilterPredicates().isEmpty());
73 this.baseQuery
= baseQuery
;
74 this.queryBuilders
= queryBuilders
;
75 this.queryRunner
= queryRunner
;
77 if (baseQuery
.getProjections().isEmpty()) {
78 projected
= Collections
.emptySet();
80 projected
= Sets
.newHashSet();
81 for (Projection proj
: baseQuery
.getProjections()) {
82 projected
.add(proj
.getPropertyName());
84 if (!baseQuery
.getSortPredicates().isEmpty()) {
85 Set
<String
> localProjected
= Sets
.newHashSet(projected
);
86 for (SortPredicate sort
: baseQuery
.getSortPredicates()) {
87 if (localProjected
.add(sort
.getPropertyName())) {
88 baseQuery
.addProjection(new PropertyProjection(sort
.getPropertyName(), null));
94 if (queryBuilders
.size() > 1 || queryBuilders
.get(0).getParallelQuerySize() > 1) {
95 if (baseQuery
.isKeysOnly()) {
96 for (SortPredicate sp
: baseQuery
.getSortPredicates()) {
97 if (!sp
.getPropertyName().equals(Entity
.KEY_RESERVED_PROPERTY
)) {
98 throw new IllegalArgumentException(
99 "The provided keys-only multi-query needs to perform some "
100 + "sorting in memory. As a result, this query can only be "
101 + "sorted by the key property as this is the only property "
102 + "that is available in memory.");
106 List
<SortPredicate
> sortPredicates
= baseQuery
.getSortPredicates();
107 List
<Order
> orders
= new ArrayList
<>(sortPredicates
.size());
108 for (SortPredicate sp
: sortPredicates
) {
109 orders
.add(QueryTranslator
.convertSortPredicateToPb(sp
));
111 this.entityComparator
= new EntityComparator(orders
);
113 this.entityComparator
= null;
116 maxBufferedIteratorsPerBuilder
= new int[queryBuilders
.size()];
117 int allocatableQueries
= MAX_BUFFERED_QUERIES
;
119 for (int i
= 0; i
< queryBuilders
.size(); i
++) {
120 ++maxBufferedIteratorsPerBuilder
[i
];
121 allocatableQueries
-= queryBuilders
.get(i
).getParallelQuerySize();
124 boolean madeEmptyPass
= false;
125 while (allocatableQueries
> 0 && !madeEmptyPass
) {
126 madeEmptyPass
= true;
127 for (int i
= 0; i
< queryBuilders
.size(); i
++) {
128 if (queryBuilders
.get(i
).getParallelQuerySize() <= allocatableQueries
) {
129 ++maxBufferedIteratorsPerBuilder
[i
];
130 allocatableQueries
-= queryBuilders
.get(i
).getParallelQuerySize();
131 madeEmptyPass
= false;
137 protected PreparedQuery
prepareQuery(List
<FilterPredicate
> filters
, boolean isCountQuery
) {
138 Query query
= new Query(baseQuery
);
139 if (isCountQuery
&& query
.getProjections().isEmpty()) {
143 query
.getFilterPredicates().addAll(filters
);
144 return new PreparedQueryImpl(query
, txn
, queryRunner
);
147 protected Object
getDedupeValue(Entity entity
) {
148 if (projected
.isEmpty()) {
149 return KeyAndProperties
.create(entity
.getKey(), null);
151 return KeyAndProperties
.create(entity
.getKey(), entity
.getProperties());
156 abstract static class KeyAndProperties
{
157 static KeyAndProperties
create(Key key
, @Nullable Map
<String
, Object
> properties
) {
158 return new AutoValue_PreparedMultiQuery_KeyAndProperties(key
, properties
);
164 abstract Map
<String
, Object
> properties();
168 * A helper function to prepare batches of queries.
169 * @param filtersList list of the filters for each query to prepare
170 * @return a list of prepared queries
172 protected List
<PreparedQuery
> prepareQueries(List
<List
<FilterPredicate
>> filtersList
) {
173 List
<PreparedQuery
> preparedQueries
= new ArrayList
<PreparedQuery
>(filtersList
.size());
174 for (List
<FilterPredicate
> filters
: filtersList
) {
175 preparedQueries
.add(prepareQuery(filters
, false));
177 return preparedQueries
;
181 * An iterator that will correctly process the values returned by a multiquery iterator.
183 * This iterator in some cases may not respect the provided FetchOptions.limit().
185 private class FilteredMultiQueryIterator
extends AbstractIterator
<Entity
> {
186 private final Iterator
<List
<List
<FilterPredicate
>>> multiQueryIterator
;
187 private final FetchOptions fetchOptions
;
188 private final Set
<Object
> seenUniqueValues
;
190 private Iterator
<Entity
> currentIterator
= Collections
.emptyIterator();
191 private Queue
<Iterator
<Entity
>> queryIterBuffer
;
193 public FilteredMultiQueryIterator(MultiQueryBuilder queryBuilder
, FetchOptions fetchOptions
,
194 Set
<Object
> seenUniqueValues
, int numIteratorsToBuffer
) {
195 this.multiQueryIterator
= queryBuilder
.iterator();
196 this.queryIterBuffer
= new ArrayDeque
<Iterator
<Entity
>>(numIteratorsToBuffer
);
197 this.fetchOptions
= fetchOptions
;
198 this.seenUniqueValues
= seenUniqueValues
;
200 while (queryIterBuffer
.size() < numIteratorsToBuffer
&& multiQueryIterator
.hasNext()) {
201 queryIterBuffer
.add(makeQueryIterator());
206 * Get the iterator for the next source query. queryIterBuffer is already filled by
207 * the {@link FilteredMultiQueryIterator} constructor.
208 * We try to refill any slots in queryIterBuffer that we free up this way to pre-warm
209 * the next query/queries.
210 * @return iterator for the next source that has results or null if there isn't another source.
212 protected Iterator
<Entity
> getNextIterator() {
213 while (!queryIterBuffer
.isEmpty()) {
214 Iterator
<Entity
> result
= queryIterBuffer
.remove();
215 if (multiQueryIterator
.hasNext()) {
216 queryIterBuffer
.add(makeQueryIterator());
218 if (result
.hasNext()) {
226 * Create a iterator on a source query, either directly from a single query if possible or
227 * by wrapping multiple queries that need to be mergesorted inside of a {@link HeapIterator}.
228 * @return an iterator on the next {@link MultiQueryBuilder} from queryBuilders.
230 private Iterator
<Entity
> makeQueryIterator() {
231 List
<PreparedQuery
> queries
= prepareQueries(multiQueryIterator
.next());
232 if (queries
.size() == 1) {
233 return queries
.get(0).asIterator(fetchOptions
);
235 return makeHeapIterator(Iterables
.transform(queries
,
236 new Function
<PreparedQuery
, Iterator
<Entity
>>() {
238 public Iterator
<Entity
> apply(PreparedQuery input
) {
239 return input
.asIterator(fetchOptions
);
246 protected Entity
computeNext() {
247 Entity result
= null;
249 if (!currentIterator
.hasNext()) {
250 currentIterator
= getNextIterator();
251 if (currentIterator
== null) {
255 result
= currentIterator
.next();
256 } while (!seenUniqueValues
.add(getDedupeValue(result
)));
258 if (!projected
.isEmpty()) {
259 for (String prop
: result
.getProperties().keySet()) {
260 if (!projected
.contains(prop
)) {
261 result
.removeProperty(prop
);
269 static final class HeapIterator
extends AbstractIterator
<Entity
> {
270 private final PriorityQueue
<EntitySource
> heap
;
272 HeapIterator(PriorityQueue
<EntitySource
> heap
) {
277 protected Entity
computeNext() {
279 result
= nextResult(heap
);
280 if (result
== null) {
287 Iterator
<Entity
> makeHeapIterator(Iterable
<Iterator
<Entity
>> iterators
) {
288 final PriorityQueue
<EntitySource
> heap
= new PriorityQueue
<EntitySource
>();
289 for (Iterator
<Entity
> iter
: iterators
) {
290 if (iter
.hasNext()) {
291 heap
.add(new EntitySource(entityComparator
, iter
));
294 return new HeapIterator(heap
);
298 * Fetch the next result from the {@link PriorityQueue} and reset the
299 * datasource from which the next result was taken.
301 static Entity
nextResult(PriorityQueue
<EntitySource
> availableEntitySources
) {
302 EntitySource current
= availableEntitySources
.poll();
303 if (current
== null) {
306 Entity result
= current
.currentEntity
;
308 if (current
.currentEntity
!= null) {
309 availableEntitySources
.add(current
);
316 * Data structure that we use in conjunction with the {@link PriorityQueue}.
317 * It always compares using its {@code currentEntity} field by delegating to
318 * its {@code entityComparator}.
320 static final class EntitySource
implements Comparable
<EntitySource
> {
321 private final EntityComparator entityComparator
;
322 private final Iterator
<Entity
> source
;
323 private Entity currentEntity
;
325 EntitySource(EntityComparator entityComparator
, Iterator
<Entity
> source
) {
326 this.entityComparator
= entityComparator
;
327 this.source
= source
;
328 if (!source
.hasNext()) {
329 throw new IllegalArgumentException("Source iterator has no data.");
331 this.currentEntity
= source
.next();
334 private void advance() {
335 currentEntity
= source
.hasNext() ? source
.next() : null;
339 public int compareTo(EntitySource entitySource
) {
340 return entityComparator
.compare(currentEntity
, entitySource
.currentEntity
);
345 public Entity
asSingleEntity() throws TooManyResultsException
{
346 List
<Entity
> result
= this.asList(FetchOptions
.Builder
.withLimit(2));
347 if (result
.size() == 1) {
348 return result
.get(0);
349 } else if (result
.size() > 1) {
350 throw new TooManyResultsException();
357 public int countEntities(FetchOptions fetchOptions
) {
358 FetchOptions overrideOptions
= new FetchOptions(fetchOptions
);
359 overrideOptions
.chunkSize(Integer
.MAX_VALUE
);
360 if (fetchOptions
.getOffset() != null) {
361 overrideOptions
.clearOffset();
362 if (fetchOptions
.getLimit() != null) {
363 int adjustedLimit
= fetchOptions
.getOffset() + fetchOptions
.getLimit();
364 if (adjustedLimit
< 0) {
365 overrideOptions
.clearLimit();
367 overrideOptions
.limit(adjustedLimit
);
372 Set
<Object
> seen
= Sets
.newHashSet();
375 for (MultiQueryBuilder queryBuilder
: queryBuilders
) {
376 for (List
<List
<FilterPredicate
>> filtersList
: queryBuilder
) {
377 for (List
<FilterPredicate
> filters
: filtersList
) {
378 PreparedQuery preparedQuery
= prepareQuery(filters
, true);
379 Query query
= new Query(baseQuery
);
380 if (query
.getProjections().isEmpty()) {
383 for (Entity entity
: preparedQuery
.asIterable(overrideOptions
)) {
384 if (seen
.add(getDedupeValue(entity
)) && overrideOptions
.getLimit() != null
385 && seen
.size() >= overrideOptions
.getLimit()) {
392 return fetchOptions
.getOffset() == null
393 ? seen
.size() : Math
.max(0, seen
.size() - fetchOptions
.getOffset());
397 public Iterator
<Entity
> asIterator(FetchOptions fetchOptions
) {
399 if ((fetchOptions
.getOffset() != null && fetchOptions
.getOffset() > 0)
400 || fetchOptions
.getLimit() != null) {
401 FetchOptions override
= new FetchOptions(fetchOptions
);
402 if (fetchOptions
.getOffset() != null) {
403 override
.clearOffset();
404 if (fetchOptions
.getLimit() != null) {
405 int adjustedLimit
= fetchOptions
.getOffset() + fetchOptions
.getLimit();
406 if (adjustedLimit
< 0) {
407 override
.clearLimit();
409 override
.limit(adjustedLimit
);
413 return new SlicingIterator
<Entity
>(
414 newFilteredMultiQueryIterator(override
),
415 fetchOptions
.getOffset(),
416 fetchOptions
.getLimit());
418 return newFilteredMultiQueryIterator(fetchOptions
);
422 private Iterator
<Entity
> newFilteredMultiQueryIterator(FetchOptions fetchOptions
) {
423 Set
<Object
> dedupeSet
= Sets
.newHashSet();
424 if (queryBuilders
.size() == 1) {
425 return new FilteredMultiQueryIterator(queryBuilders
.get(0), fetchOptions
, dedupeSet
,
426 maxBufferedIteratorsPerBuilder
[0]);
428 List
<Iterator
<Entity
>> iterators
= Lists
.newArrayListWithCapacity(queryBuilders
.size());
429 for (int i
= 0; i
< queryBuilders
.size(); i
++) {
430 iterators
.add(new FilteredMultiQueryIterator(queryBuilders
.get(i
), fetchOptions
, dedupeSet
,
431 maxBufferedIteratorsPerBuilder
[i
]));
433 return makeHeapIterator(iterators
);
437 public List
<Entity
> asList(FetchOptions fetchOptions
) {
438 FetchOptions override
= new FetchOptions(fetchOptions
);
439 if (override
.getChunkSize() == null) {
440 override
.chunkSize(Integer
.MAX_VALUE
);
443 List
<Entity
> results
= new ArrayList
<Entity
>();
444 Iterables
.addAll(results
, asIterable(override
));
448 private static class NullQueryResult
implements QueryResult
{
450 public static final NullQueryResult INSTANCE
= new NullQueryResult();
453 public List
<Index
> getIndexList() {
458 public Cursor
getCursor() {
465 public QueryResultIterator
<Entity
> asQueryResultIterator(FetchOptions fetchOptions
) {
466 return new QueryResultIteratorDelegator
<Entity
>(new NullQueryResult(),
467 asIterator(fetchOptions
));
471 public QueryResultList
<Entity
> asQueryResultList(FetchOptions fetchOptions
) {
472 return new QueryResultListDelegator
<Entity
>(NullQueryResult
.INSTANCE
,
473 asList(fetchOptions
));