Revision created by MOE tool push_codebase.
[gae.git] / java / src / main / com / google / appengine / api / datastore / BaseQueryResultsSource.java
bloba40e7ca7d3a19e45ce29ec69ec3e135444295af8
1 // Copyright 2007 Google Inc. All rights reserved.
3 package com.google.appengine.api.datastore;
5 import com.google.common.collect.Sets;
7 import java.util.Collection;
8 import java.util.List;
9 import java.util.Set;
10 import java.util.concurrent.Future;
11 import java.util.concurrent.atomic.AtomicLong;
12 import java.util.logging.Logger;
14 /**
15 * Concrete implementation of QueryResultsSource which knows how to
16 * make callbacks back into the datastore to retrieve more entities
17 * for the specified cursor.
20 abstract class BaseQueryResultsSource<InitialResultT, NextRequestT, NextResultT>
21 implements QueryResultsSource {
23 /**
24 * A common interface for working with a query result.
26 interface WrappedQueryResult {
27 /**
28 * Get the end cursor associated with the wrapped query result.
30 Cursor getEndCursor();
32 /**
33 * Get the entities included in the wrapped query result.
34 * @param projections Projections from the initial {@link Query}.
36 List<Entity> getEntities(Collection<Projection> projections);
38 /**
39 * A list of Cursor objections which correspond to the entities returned
40 * by {@link #getEntities(Collection)}.
42 List< Cursor> getResultCursors();
44 /**
45 * Return the cursor just after the last result skipped due to an offset.
46 */ Cursor getSkippedResultsCursor();
48 /**
49 * Indices whether a {@link #makeNextCall(Object, WrappedQueryResult, Integer, Integer)} call
50 * with the wrapped object will return more results.
52 boolean hasMoreResults();
54 /**
55 * The number of results skipped over due to an offset.
57 int numSkippedResults();
59 /**
60 * Parses information about the indexes used in the query.
62 * @param monitoredIndexBuffer Indexes with the 'only use if required' flag set will be
63 * added to this buffer.
64 * @returns CompositeIndexes which were used in the query.
66 List<Index> getIndexInfo(Collection<Index> monitoredIndexBuffer);
69 static Logger logger = Logger.getLogger(BaseQueryResultsSource.class.getName());
70 private static final int AT_LEAST_ONE = -1;
71 private static final String DISABLE_CHUNK_SIZE_WARNING_SYS_PROP =
72 "appengine.datastore.disableChunkSizeWarning";
73 private static final int CHUNK_SIZE_WARNING_RESULT_SET_SIZE_THRESHOLD = 1000;
74 private static final long MAX_CHUNK_SIZE_WARNING_FREQUENCY_MS = 1000 * 60 * 5;
75 static MonitoredIndexUsageTracker monitoredIndexUsageTracker = new MonitoredIndexUsageTracker();
76 static final AtomicLong lastChunkSizeWarning = new AtomicLong(0);
78 private final DatastoreCallbacks callbacks;
79 private final int chunkSize;
80 private final int offset;
81 private final Transaction txn;
82 private final Query query;
83 private final CurrentTransactionProvider currentTransactionProvider;
85 private Future<NextResultT> queryResultFuture = null;
86 private int skippedResults;
87 private int totalResults = 0;
88 private List<Index> indexList = null;
89 private boolean addedSkippedCursor;
90 private final Future<InitialResultT> initialQueryResultFuture;
92 /**
93 * Prototype for next/continue requests.
94 * This field remains null until initialQueryResultFuture is processed.
96 private NextRequestT nextQueryPrototype = null;
98 public BaseQueryResultsSource(DatastoreCallbacks callbacks, FetchOptions fetchOptions,
99 final Transaction txn, Query query, Future<InitialResultT> initialQueryResultFuture) {
100 this.callbacks = callbacks;
101 this.chunkSize = fetchOptions.getChunkSize() != null
102 ? fetchOptions.getChunkSize() : AT_LEAST_ONE;
103 this.offset = fetchOptions.getOffset() != null ? fetchOptions.getOffset() : 0;
104 this.txn = txn;
105 this.query = query;
106 this.currentTransactionProvider = new CurrentTransactionProvider() {
107 @Override
108 public Transaction getCurrentTransaction(Transaction defaultValue) {
109 return txn;
112 this.initialQueryResultFuture = initialQueryResultFuture;
113 this.skippedResults = 0;
117 * Wrap an initial query result and provide a standard interface for data extraction.
119 abstract WrappedQueryResult wrapInitialResult(InitialResultT res);
122 * Wrap a continue query request and provide a standard interface for data extraction.
124 abstract WrappedQueryResult wrapResult(NextResultT res);
127 * Construct base object for continuing queries if more results are needed. This object
128 * will passed into {@link #makeNextCall(Object, WrappedQueryResult, Integer, Integer)}.
130 abstract NextRequestT buildNextCallPrototype(InitialResultT res);
133 * Issue a continue query request to the {@link AsyncDatastoreService}.
134 * @returns The future containing the result.
136 abstract Future<NextResultT> makeNextCall(NextRequestT prototype,
137 WrappedQueryResult latestResult, Integer fetchCountOrNull, Integer offsetOrNull);
139 @Override
140 public boolean hasMoreEntities() {
141 return (nextQueryPrototype == null || queryResultFuture != null);
144 @Override
145 public int getNumSkipped() {
146 return skippedResults;
149 @Override
150 public List<Index> getIndexList() {
151 if (indexList == null) {
152 InitialResultT res = FutureHelper.quietGet(initialQueryResultFuture);
153 Set<Index> monitoredIndexBuffer = Sets.newHashSet();
154 indexList = wrapInitialResult(res).getIndexInfo(monitoredIndexBuffer);
155 if (!monitoredIndexBuffer.isEmpty()) {
156 monitoredIndexUsageTracker.addNewUsage(monitoredIndexBuffer, query);
159 return indexList;
162 @Override
163 public Cursor loadMoreEntities(List<Entity> buffer, List<Cursor> cursorBuffer) {
164 return loadMoreEntities(AT_LEAST_ONE, buffer, cursorBuffer);
167 @Override
168 public Cursor loadMoreEntities(int numberToLoad, List<Entity> buffer, List<Cursor> cursorBuffer) {
169 TransactionImpl.ensureTxnActive(txn);
170 if (nextQueryPrototype == null || queryResultFuture != null) {
171 if (numberToLoad == 0 &&
172 offset <= skippedResults) {
173 if (!addedSkippedCursor) {
174 cursorBuffer.add(null);
175 addedSkippedCursor = true;
177 return null;
179 WrappedQueryResult res;
180 if (nextQueryPrototype == null) {
181 getIndexList();
182 InitialResultT initialRes = FutureHelper.quietGet(initialQueryResultFuture);
183 nextQueryPrototype = buildNextCallPrototype(initialRes);
184 res = wrapInitialResult(initialRes);
185 } else {
186 res = wrapResult(FutureHelper.quietGet(queryResultFuture));
187 queryResultFuture = null;
190 int fetchedSoFar = processQueryResult(res, buffer, cursorBuffer);
192 Integer fetchCountOrNull = null;
193 Integer offsetOrNull = null;
194 if (res.hasMoreResults()) {
195 boolean setCount = true;
196 if (numberToLoad <= 0) {
197 setCount = false;
198 if (chunkSize != AT_LEAST_ONE) {
199 fetchCountOrNull = chunkSize;
201 if (numberToLoad == AT_LEAST_ONE) {
202 numberToLoad = 1;
206 while (res.hasMoreResults()
207 && (skippedResults < offset
208 || fetchedSoFar < numberToLoad)) {
209 if (skippedResults < offset) {
210 offsetOrNull = offset - skippedResults;
211 } else {
212 offsetOrNull = null;
214 if (setCount) {
215 fetchCountOrNull = Math.max(chunkSize, numberToLoad - fetchedSoFar);
217 res = wrapResult(FutureHelper.quietGet(
218 makeNextCall(nextQueryPrototype, res, fetchCountOrNull, offsetOrNull)));
219 fetchedSoFar += processQueryResult(res, buffer, cursorBuffer);
223 if (res.hasMoreResults()) {
224 fetchCountOrNull = chunkSize != AT_LEAST_ONE ? chunkSize : null;
225 offsetOrNull = null;
226 queryResultFuture = makeNextCall(nextQueryPrototype, res, fetchCountOrNull, offsetOrNull);
228 return res.getEndCursor();
230 return null;
234 * Helper function to process the query results.
236 * This function adds results to the given buffer and updates {@link
237 * #skippedResults}.
239 * @param res The {@link com.google.apphosting.datastore.DatastoreV3Pb.QueryResult} to process
240 * @param buffer the buffer to which to add results
241 * @returns The number of new results added to buffer.
243 private int processQueryResult(WrappedQueryResult res, List<Entity> buffer,
244 List<Cursor> cursorBuffer) {
245 skippedResults += res.numSkippedResults();
246 if (skippedResults >= offset && !addedSkippedCursor) {
247 cursorBuffer.add(res.getSkippedResultsCursor());
248 addedSkippedCursor = true;
251 List<Entity> entityList = res.getEntities(query.getProjections());
252 buffer.addAll(entityList);
253 cursorBuffer.addAll(res.getResultCursors());
254 for (Entity entity : entityList) {
255 callbacks.executePostLoadCallbacks(new PostLoadContext(currentTransactionProvider, entity));
257 totalResults += entityList.size();
258 if (chunkSize == AT_LEAST_ONE && totalResults > CHUNK_SIZE_WARNING_RESULT_SET_SIZE_THRESHOLD
259 && System.getProperty(DISABLE_CHUNK_SIZE_WARNING_SYS_PROP) == null) {
260 logChunkSizeWarning();
262 return entityList.size();
265 void logChunkSizeWarning() {
266 long now = System.currentTimeMillis();
267 if ((now - lastChunkSizeWarning.get()) < MAX_CHUNK_SIZE_WARNING_FREQUENCY_MS) {
268 return;
270 logger.warning(
271 "This query does not have a chunk size set in FetchOptions and has returned over "
272 + CHUNK_SIZE_WARNING_RESULT_SET_SIZE_THRESHOLD + " results. If result sets of this "
273 + "size are common for this query, consider setting a chunk size to improve "
274 + "performance.\n To disable this warning set the following system property in "
275 + "appengine-web.xml (the value of the property doesn't matter): '"
276 + DISABLE_CHUNK_SIZE_WARNING_SYS_PROP + "'");
277 lastChunkSizeWarning.set(now);