1 // Copyright 2007 Google Inc. All rights reserved.
3 package com
.google
.appengine
.api
.datastore
;
5 import com
.google
.common
.collect
.Sets
;
7 import java
.util
.Collection
;
10 import java
.util
.concurrent
.Future
;
11 import java
.util
.concurrent
.atomic
.AtomicLong
;
12 import java
.util
.logging
.Logger
;
15 * Concrete implementation of QueryResultsSource which knows how to
16 * make callbacks back into the datastore to retrieve more entities
17 * for the specified cursor.
20 abstract class BaseQueryResultsSource
<InitialResultT
, NextRequestT
, NextResultT
>
21 implements QueryResultsSource
{
24 * A common interface for working with a query result.
26 interface WrappedQueryResult
{
28 * Get the end cursor associated with the wrapped query result.
30 Cursor
getEndCursor();
33 * Get the entities included in the wrapped query result.
34 * @param projections Projections from the initial {@link Query}.
36 List
<Entity
> getEntities(Collection
<Projection
> projections
);
39 * A list of Cursor objections which correspond to the entities returned
40 * by {@link #getEntities(Collection)}.
42 List
< Cursor
> getResultCursors();
45 * Return the cursor just after the last result skipped due to an offset.
46 */ Cursor
getSkippedResultsCursor();
49 * Indices whether a {@link #makeNextCall(Object, WrappedQueryResult, Integer, Integer)} call
50 * with the wrapped object will return more results.
52 boolean hasMoreResults();
55 * The number of results skipped over due to an offset.
57 int numSkippedResults();
60 * Parses information about the indexes used in the query.
62 * @param monitoredIndexBuffer Indexes with the 'only use if required' flag set will be
63 * added to this buffer.
64 * @returns CompositeIndexes which were used in the query.
66 List
<Index
> getIndexInfo(Collection
<Index
> monitoredIndexBuffer
);
69 * Return false if the client has detected that this query result made no
70 * progress compared to the previous result. Note that the v3 API doesn't
71 * require this kind of client-side logic.
73 boolean madeProgress(WrappedQueryResult previousResult
);
76 static Logger logger
= Logger
.getLogger(BaseQueryResultsSource
.class.getName());
77 private static final int AT_LEAST_ONE
= -1;
78 private static final String DISABLE_CHUNK_SIZE_WARNING_SYS_PROP
=
79 "appengine.datastore.disableChunkSizeWarning";
80 private static final int CHUNK_SIZE_WARNING_RESULT_SET_SIZE_THRESHOLD
= 1000;
81 private static final long MAX_CHUNK_SIZE_WARNING_FREQUENCY_MS
= 1000 * 60 * 5;
82 static MonitoredIndexUsageTracker monitoredIndexUsageTracker
= new MonitoredIndexUsageTracker();
83 static final AtomicLong lastChunkSizeWarning
= new AtomicLong(0);
85 private final DatastoreCallbacks callbacks
;
86 private final int chunkSize
;
87 private final int offset
;
88 private final Transaction txn
;
89 private final Query query
;
90 private final CurrentTransactionProvider currentTransactionProvider
;
92 private Future
<NextResultT
> queryResultFuture
= null;
93 private int skippedResults
;
94 private int totalResults
= 0;
95 private List
<Index
> indexList
= null;
96 private boolean addedSkippedCursor
;
97 private final Future
<InitialResultT
> initialQueryResultFuture
;
100 * Prototype for next/continue requests.
101 * This field remains null until initialQueryResultFuture is processed.
103 private NextRequestT nextQueryPrototype
= null;
105 public BaseQueryResultsSource(DatastoreCallbacks callbacks
, FetchOptions fetchOptions
,
106 final Transaction txn
, Query query
, Future
<InitialResultT
> initialQueryResultFuture
) {
107 this.callbacks
= callbacks
;
108 this.chunkSize
= fetchOptions
.getChunkSize() != null
109 ? fetchOptions
.getChunkSize() : AT_LEAST_ONE
;
110 this.offset
= fetchOptions
.getOffset() != null ? fetchOptions
.getOffset() : 0;
113 this.currentTransactionProvider
= new CurrentTransactionProvider() {
115 public Transaction
getCurrentTransaction(Transaction defaultValue
) {
119 this.initialQueryResultFuture
= initialQueryResultFuture
;
120 this.skippedResults
= 0;
124 * Wrap an initial query result and provide a standard interface for data extraction.
126 abstract WrappedQueryResult
wrapInitialResult(InitialResultT res
);
129 * Wrap a continue query request and provide a standard interface for data extraction.
131 abstract WrappedQueryResult
wrapResult(NextResultT res
);
134 * Construct base object for continuing queries if more results are needed. This object
135 * will passed into {@link #makeNextCall(Object, WrappedQueryResult, Integer, Integer)}.
137 abstract NextRequestT
buildNextCallPrototype(InitialResultT res
);
140 * Issue a continue query request to the {@link AsyncDatastoreService}.
141 * @returns The future containing the result.
143 abstract Future
<NextResultT
> makeNextCall(NextRequestT prototype
,
144 WrappedQueryResult latestResult
, Integer fetchCountOrNull
, Integer offsetOrNull
);
147 public boolean hasMoreEntities() {
148 return (nextQueryPrototype
== null || queryResultFuture
!= null);
152 public int getNumSkipped() {
153 return skippedResults
;
157 public List
<Index
> getIndexList() {
158 if (indexList
== null) {
159 InitialResultT res
= FutureHelper
.quietGet(initialQueryResultFuture
);
160 Set
<Index
> monitoredIndexBuffer
= Sets
.newHashSet();
161 indexList
= wrapInitialResult(res
).getIndexInfo(monitoredIndexBuffer
);
162 if (!monitoredIndexBuffer
.isEmpty()) {
163 monitoredIndexUsageTracker
.addNewUsage(monitoredIndexBuffer
, query
);
170 public Cursor
loadMoreEntities(List
<Entity
> buffer
, List
<Cursor
> cursorBuffer
) {
171 return loadMoreEntities(AT_LEAST_ONE
, buffer
, cursorBuffer
);
175 public Cursor
loadMoreEntities(int numberToLoad
, List
<Entity
> buffer
, List
<Cursor
> cursorBuffer
) {
176 TransactionImpl
.ensureTxnActive(txn
);
177 if (!hasMoreEntities()) {
181 if (numberToLoad
== 0
182 && offset
<= skippedResults
) {
183 if (!addedSkippedCursor
) {
184 cursorBuffer
.add(null);
185 addedSkippedCursor
= true;
190 WrappedQueryResult res
;
191 if (nextQueryPrototype
== null) {
193 InitialResultT initialRes
= FutureHelper
.quietGet(initialQueryResultFuture
);
194 nextQueryPrototype
= buildNextCallPrototype(initialRes
);
195 res
= wrapInitialResult(initialRes
);
197 res
= wrapResult(FutureHelper
.quietGet(queryResultFuture
));
198 queryResultFuture
= null;
201 int fetchedSoFar
= processQueryResult(res
, buffer
, cursorBuffer
);
203 Integer fetchCountOrNull
= null;
204 Integer offsetOrNull
= null;
205 if (res
.hasMoreResults()) {
206 boolean setCount
= true;
207 if (numberToLoad
<= 0) {
209 if (chunkSize
!= AT_LEAST_ONE
) {
210 fetchCountOrNull
= chunkSize
;
212 if (numberToLoad
== AT_LEAST_ONE
) {
217 while (res
.hasMoreResults()
218 && (skippedResults
< offset
219 || fetchedSoFar
< numberToLoad
)) {
220 if (skippedResults
< offset
) {
221 offsetOrNull
= offset
- skippedResults
;
226 fetchCountOrNull
= Math
.max(chunkSize
, numberToLoad
- fetchedSoFar
);
228 WrappedQueryResult nextRes
= wrapResult(FutureHelper
.quietGet(
229 makeNextCall(nextQueryPrototype
, res
, fetchCountOrNull
, offsetOrNull
)));
230 if (!nextRes
.madeProgress(res
)) {
231 throw new DatastoreTimeoutException("The query was not able to make any progress.");
234 fetchedSoFar
+= processQueryResult(res
, buffer
, cursorBuffer
);
238 if (res
.hasMoreResults()) {
239 fetchCountOrNull
= chunkSize
!= AT_LEAST_ONE ? chunkSize
: null;
241 queryResultFuture
= makeNextCall(nextQueryPrototype
, res
, fetchCountOrNull
, offsetOrNull
);
243 return res
.getEndCursor();
247 * Helper function to process the query results.
249 * This function adds results to the given buffer and updates {@link
252 * @param res The {@link com.google.apphosting.datastore.DatastoreV3Pb.QueryResult} to process
253 * @param buffer the buffer to which to add results
254 * @returns The number of new results added to buffer.
256 private int processQueryResult(WrappedQueryResult res
, List
<Entity
> buffer
,
257 List
<Cursor
> cursorBuffer
) {
258 skippedResults
+= res
.numSkippedResults();
259 if (skippedResults
>= offset
&& !addedSkippedCursor
) {
260 cursorBuffer
.add(res
.getSkippedResultsCursor());
261 addedSkippedCursor
= true;
264 List
<Entity
> entityList
= res
.getEntities(query
.getProjections());
265 buffer
.addAll(entityList
);
266 cursorBuffer
.addAll(res
.getResultCursors());
267 for (Entity entity
: entityList
) {
268 callbacks
.executePostLoadCallbacks(new PostLoadContext(currentTransactionProvider
, entity
));
270 totalResults
+= entityList
.size();
271 if (chunkSize
== AT_LEAST_ONE
&& totalResults
> CHUNK_SIZE_WARNING_RESULT_SET_SIZE_THRESHOLD
272 && System
.getProperty(DISABLE_CHUNK_SIZE_WARNING_SYS_PROP
) == null) {
273 logChunkSizeWarning();
275 return entityList
.size();
278 void logChunkSizeWarning() {
279 long now
= System
.currentTimeMillis();
280 if ((now
- lastChunkSizeWarning
.get()) < MAX_CHUNK_SIZE_WARNING_FREQUENCY_MS
) {
284 "This query does not have a chunk size set in FetchOptions and has returned over "
285 + CHUNK_SIZE_WARNING_RESULT_SET_SIZE_THRESHOLD
+ " results. If result sets of this "
286 + "size are common for this query, consider setting a chunk size to improve "
287 + "performance.\n To disable this warning set the following system property in "
288 + "appengine-web.xml (the value of the property doesn't matter): '"
289 + DISABLE_CHUNK_SIZE_WARNING_SYS_PROP
+ "'");
290 lastChunkSizeWarning
.set(now
);