App Engine Java SDK version 1.9.25
[gae.git] / java / src / main / com / google / appengine / api / datastore / BaseQueryResultsSource.java
blob32346816ee7687b39122de29b7020506a061b260
1 // Copyright 2007 Google Inc. All rights reserved.
3 package com.google.appengine.api.datastore;
5 import com.google.common.collect.Sets;
7 import java.util.Collection;
8 import java.util.List;
9 import java.util.Set;
10 import java.util.concurrent.Future;
11 import java.util.concurrent.atomic.AtomicLong;
12 import java.util.logging.Logger;
14 /**
15 * Concrete implementation of QueryResultsSource which knows how to
16 * make callbacks back into the datastore to retrieve more entities
17 * for the specified cursor.
20 abstract class BaseQueryResultsSource<InitialResultT, NextRequestT, NextResultT>
21 implements QueryResultsSource {
23 /**
24 * A common interface for working with a query result.
26 interface WrappedQueryResult {
27 /**
28 * Get the end cursor associated with the wrapped query result.
30 Cursor getEndCursor();
32 /**
33 * Get the entities included in the wrapped query result.
34 * @param projections Projections from the initial {@link Query}.
36 List<Entity> getEntities(Collection<Projection> projections);
38 /**
39 * A list of Cursor objections which correspond to the entities returned
40 * by {@link #getEntities(Collection)}.
42 List< Cursor> getResultCursors();
44 /**
45 * Return the cursor just after the last result skipped due to an offset.
46 */ Cursor getSkippedResultsCursor();
48 /**
49 * Indices whether a {@link #makeNextCall(Object, WrappedQueryResult, Integer, Integer)} call
50 * with the wrapped object will return more results.
52 boolean hasMoreResults();
54 /**
55 * The number of results skipped over due to an offset.
57 int numSkippedResults();
59 /**
60 * Parses information about the indexes used in the query.
62 * @param monitoredIndexBuffer Indexes with the 'only use if required' flag set will be
63 * added to this buffer.
64 * @returns CompositeIndexes which were used in the query.
66 List<Index> getIndexInfo(Collection<Index> monitoredIndexBuffer);
68 /**
69 * Return false if the client has detected that this query result made no
70 * progress compared to the previous result. Note that the v3 API doesn't
71 * require this kind of client-side logic.
73 boolean madeProgress(WrappedQueryResult previousResult);
76 static Logger logger = Logger.getLogger(BaseQueryResultsSource.class.getName());
77 private static final int AT_LEAST_ONE = -1;
78 private static final String DISABLE_CHUNK_SIZE_WARNING_SYS_PROP =
79 "appengine.datastore.disableChunkSizeWarning";
80 private static final int CHUNK_SIZE_WARNING_RESULT_SET_SIZE_THRESHOLD = 1000;
81 private static final long MAX_CHUNK_SIZE_WARNING_FREQUENCY_MS = 1000 * 60 * 5;
82 static MonitoredIndexUsageTracker monitoredIndexUsageTracker = new MonitoredIndexUsageTracker();
83 static final AtomicLong lastChunkSizeWarning = new AtomicLong(0);
85 private final DatastoreCallbacks callbacks;
86 private final int chunkSize;
87 private final int offset;
88 private final Transaction txn;
89 private final Query query;
90 private final CurrentTransactionProvider currentTransactionProvider;
92 private Future<NextResultT> queryResultFuture = null;
93 private int skippedResults;
94 private int totalResults = 0;
95 private List<Index> indexList = null;
96 private boolean addedSkippedCursor;
97 private final Future<InitialResultT> initialQueryResultFuture;
99 /**
100 * Prototype for next/continue requests.
101 * This field remains null until initialQueryResultFuture is processed.
103 private NextRequestT nextQueryPrototype = null;
105 public BaseQueryResultsSource(DatastoreCallbacks callbacks, FetchOptions fetchOptions,
106 final Transaction txn, Query query, Future<InitialResultT> initialQueryResultFuture) {
107 this.callbacks = callbacks;
108 this.chunkSize = fetchOptions.getChunkSize() != null
109 ? fetchOptions.getChunkSize() : AT_LEAST_ONE;
110 this.offset = fetchOptions.getOffset() != null ? fetchOptions.getOffset() : 0;
111 this.txn = txn;
112 this.query = query;
113 this.currentTransactionProvider = new CurrentTransactionProvider() {
114 @Override
115 public Transaction getCurrentTransaction(Transaction defaultValue) {
116 return txn;
119 this.initialQueryResultFuture = initialQueryResultFuture;
120 this.skippedResults = 0;
124 * Wrap an initial query result and provide a standard interface for data extraction.
126 abstract WrappedQueryResult wrapInitialResult(InitialResultT res);
129 * Wrap a continue query request and provide a standard interface for data extraction.
131 abstract WrappedQueryResult wrapResult(NextResultT res);
134 * Construct base object for continuing queries if more results are needed. This object
135 * will passed into {@link #makeNextCall(Object, WrappedQueryResult, Integer, Integer)}.
137 abstract NextRequestT buildNextCallPrototype(InitialResultT res);
140 * Issue a continue query request to the {@link AsyncDatastoreService}.
141 * @returns The future containing the result.
143 abstract Future<NextResultT> makeNextCall(NextRequestT prototype,
144 WrappedQueryResult latestResult, Integer fetchCountOrNull, Integer offsetOrNull);
146 @Override
147 public boolean hasMoreEntities() {
148 return (nextQueryPrototype == null || queryResultFuture != null);
151 @Override
152 public int getNumSkipped() {
153 return skippedResults;
156 @Override
157 public List<Index> getIndexList() {
158 if (indexList == null) {
159 InitialResultT res = FutureHelper.quietGet(initialQueryResultFuture);
160 Set<Index> monitoredIndexBuffer = Sets.newHashSet();
161 indexList = wrapInitialResult(res).getIndexInfo(monitoredIndexBuffer);
162 if (!monitoredIndexBuffer.isEmpty()) {
163 monitoredIndexUsageTracker.addNewUsage(monitoredIndexBuffer, query);
166 return indexList;
169 @Override
170 public Cursor loadMoreEntities(List<Entity> buffer, List<Cursor> cursorBuffer) {
171 return loadMoreEntities(AT_LEAST_ONE, buffer, cursorBuffer);
174 @Override
175 public Cursor loadMoreEntities(int numberToLoad, List<Entity> buffer, List<Cursor> cursorBuffer) {
176 TransactionImpl.ensureTxnActive(txn);
177 if (!hasMoreEntities()) {
178 return null;
181 if (numberToLoad == 0
182 && offset <= skippedResults) {
183 if (!addedSkippedCursor) {
184 cursorBuffer.add(null);
185 addedSkippedCursor = true;
187 return null;
190 WrappedQueryResult res;
191 if (nextQueryPrototype == null) {
192 getIndexList();
193 InitialResultT initialRes = FutureHelper.quietGet(initialQueryResultFuture);
194 nextQueryPrototype = buildNextCallPrototype(initialRes);
195 res = wrapInitialResult(initialRes);
196 } else {
197 res = wrapResult(FutureHelper.quietGet(queryResultFuture));
198 queryResultFuture = null;
201 int fetchedSoFar = processQueryResult(res, buffer, cursorBuffer);
203 Integer fetchCountOrNull = null;
204 Integer offsetOrNull = null;
205 if (res.hasMoreResults()) {
206 boolean setCount = true;
207 if (numberToLoad <= 0) {
208 setCount = false;
209 if (chunkSize != AT_LEAST_ONE) {
210 fetchCountOrNull = chunkSize;
212 if (numberToLoad == AT_LEAST_ONE) {
213 numberToLoad = 1;
217 while (res.hasMoreResults()
218 && (skippedResults < offset
219 || fetchedSoFar < numberToLoad)) {
220 if (skippedResults < offset) {
221 offsetOrNull = offset - skippedResults;
222 } else {
223 offsetOrNull = null;
225 if (setCount) {
226 fetchCountOrNull = Math.max(chunkSize, numberToLoad - fetchedSoFar);
228 WrappedQueryResult nextRes = wrapResult(FutureHelper.quietGet(
229 makeNextCall(nextQueryPrototype, res, fetchCountOrNull, offsetOrNull)));
230 if (!nextRes.madeProgress(res)) {
231 throw new DatastoreTimeoutException("The query was not able to make any progress.");
233 res = nextRes;
234 fetchedSoFar += processQueryResult(res, buffer, cursorBuffer);
238 if (res.hasMoreResults()) {
239 fetchCountOrNull = chunkSize != AT_LEAST_ONE ? chunkSize : null;
240 offsetOrNull = null;
241 queryResultFuture = makeNextCall(nextQueryPrototype, res, fetchCountOrNull, offsetOrNull);
243 return res.getEndCursor();
247 * Helper function to process the query results.
249 * This function adds results to the given buffer and updates {@link
250 * #skippedResults}.
252 * @param res The {@link com.google.apphosting.datastore.DatastoreV3Pb.QueryResult} to process
253 * @param buffer the buffer to which to add results
254 * @returns The number of new results added to buffer.
256 private int processQueryResult(WrappedQueryResult res, List<Entity> buffer,
257 List<Cursor> cursorBuffer) {
258 skippedResults += res.numSkippedResults();
259 if (skippedResults >= offset && !addedSkippedCursor) {
260 cursorBuffer.add(res.getSkippedResultsCursor());
261 addedSkippedCursor = true;
264 List<Entity> entityList = res.getEntities(query.getProjections());
265 buffer.addAll(entityList);
266 cursorBuffer.addAll(res.getResultCursors());
267 for (Entity entity : entityList) {
268 callbacks.executePostLoadCallbacks(new PostLoadContext(currentTransactionProvider, entity));
270 totalResults += entityList.size();
271 if (chunkSize == AT_LEAST_ONE && totalResults > CHUNK_SIZE_WARNING_RESULT_SET_SIZE_THRESHOLD
272 && System.getProperty(DISABLE_CHUNK_SIZE_WARNING_SYS_PROP) == null) {
273 logChunkSizeWarning();
275 return entityList.size();
278 void logChunkSizeWarning() {
279 long now = System.currentTimeMillis();
280 if ((now - lastChunkSizeWarning.get()) < MAX_CHUNK_SIZE_WARNING_FREQUENCY_MS) {
281 return;
283 logger.warning(
284 "This query does not have a chunk size set in FetchOptions and has returned over "
285 + CHUNK_SIZE_WARNING_RESULT_SET_SIZE_THRESHOLD + " results. If result sets of this "
286 + "size are common for this query, consider setting a chunk size to improve "
287 + "performance.\n To disable this warning set the following system property in "
288 + "appengine-web.xml (the value of the property doesn't matter): '"
289 + DISABLE_CHUNK_SIZE_WARNING_SYS_PROP + "'");
290 lastChunkSizeWarning.set(now);