1 // Copyright 2007 Google Inc. All rights reserved.
3 package com
.google
.appengine
.api
.datastore
;
5 import com
.google
.common
.collect
.Sets
;
7 import java
.util
.Collection
;
10 import java
.util
.concurrent
.Future
;
11 import java
.util
.concurrent
.atomic
.AtomicLong
;
12 import java
.util
.logging
.Logger
;
15 * Concrete implementation of QueryResultsSource which knows how to
16 * make callbacks back into the datastore to retrieve more entities
17 * for the specified cursor.
20 abstract class BaseQueryResultsSource
<InitialResultT
, NextRequestT
, NextResultT
>
21 implements QueryResultsSource
{
24 * A common interface for working with a query result.
26 interface WrappedQueryResult
{
28 * Get the end cursor associated with the wrapped query result.
30 Cursor
getEndCursor();
33 * Get the entities included in the wrapped query result.
34 * @param projections Projections from the initial {@link Query}.
36 List
<Entity
> getEntities(Collection
<Projection
> projections
);
39 * A list of Cursor objections which correspond to the entities returned
40 * by {@link #getEntities(Collection)}.
42 List
< Cursor
> getResultCursors();
45 * Return the cursor just after the last result skipped due to an offset.
46 */ Cursor
getSkippedResultsCursor();
49 * Indices whether a {@link #makeNextCall(Object, Integer, Integer)} call
50 * with the wrapped object will return more results.
52 boolean hasMoreResults();
55 * The number of results skipped over due to an offset.
57 int numSkippedResults();
60 * Parses information about the indexes used in the query.
62 * @param monitoredIndexBuffer Indexes with the 'only use if required' flag set will be
63 * added to this buffer.
64 * @returns CompositeIndexes which were used in the query.
66 List
<Index
> getIndexInfo(Collection
<Index
> monitoredIndexBuffer
);
69 static Logger logger
= Logger
.getLogger(BaseQueryResultsSource
.class.getName());
70 private static final int AT_LEAST_ONE
= -1;
71 private static final String DISABLE_CHUNK_SIZE_WARNING_SYS_PROP
=
72 "appengine.datastore.disableChunkSizeWarning";
73 private static final int CHUNK_SIZE_WARNING_RESULT_SET_SIZE_THRESHOLD
= 1000;
74 private static final long MAX_CHUNK_SIZE_WARNING_FREQUENCY_MS
= 1000 * 60 * 5;
75 static MonitoredIndexUsageTracker monitoredIndexUsageTracker
= new MonitoredIndexUsageTracker();
76 static final AtomicLong lastChunkSizeWarning
= new AtomicLong(0);
78 private final DatastoreCallbacks callbacks
;
79 private final int chunkSize
;
80 private final int offset
;
81 private final Transaction txn
;
82 private final Query query
;
83 private final CurrentTransactionProvider currentTransactionProvider
;
85 private Future
<NextResultT
> queryResultFuture
= null;
86 private int skippedResults
;
87 private int totalResults
= 0;
88 private List
<Index
> indexList
= null;
89 private boolean addedSkippedCursor
;
90 private final Future
<InitialResultT
> initialQueryResultFuture
;
93 * Prototype for next/continue requests.
94 * This field remains null until initialQueryResultFuture is processed.
96 private NextRequestT nextQueryPrototype
= null;
98 public BaseQueryResultsSource(DatastoreCallbacks callbacks
, FetchOptions fetchOptions
,
99 final Transaction txn
, Query query
, Future
<InitialResultT
> initialQueryResultFuture
) {
100 this.callbacks
= callbacks
;
101 this.chunkSize
= fetchOptions
.getChunkSize() != null
102 ? fetchOptions
.getChunkSize() : AT_LEAST_ONE
;
103 this.offset
= fetchOptions
.getOffset() != null ? fetchOptions
.getOffset() : 0;
106 this.currentTransactionProvider
= new CurrentTransactionProvider() {
108 public Transaction
getCurrentTransaction(Transaction defaultValue
) {
112 this.initialQueryResultFuture
= initialQueryResultFuture
;
113 this.skippedResults
= 0;
117 * Wrap an initial query result and provide a standard interface for data extraction.
119 abstract WrappedQueryResult
wrapInitialResult(InitialResultT res
);
122 * Wrap a continue query request and provide a standard interface for data extraction.
124 abstract WrappedQueryResult
wrapResult(NextResultT res
);
127 * Construct base object for continuing queries if more results are needed. This object
128 * will passed into {@link #makeNextCall(Object, Integer, Integer)}.
130 abstract NextRequestT
buildNextCallPrototype(InitialResultT res
);
133 * Issue a continue query request to the {@link AsyncDatastoreService}.
134 * @returns The future containing the result.
136 abstract Future
<NextResultT
> makeNextCall(NextRequestT prototype
, Integer fetchCountOrNull
,
137 Integer offsetOrNull
);
140 public boolean hasMoreEntities() {
141 return (nextQueryPrototype
== null || queryResultFuture
!= null);
145 public int getNumSkipped() {
146 return skippedResults
;
150 public List
<Index
> getIndexList() {
151 if (indexList
== null) {
152 InitialResultT res
= FutureHelper
.quietGet(initialQueryResultFuture
);
153 Set
<Index
> monitoredIndexBuffer
= Sets
.newHashSet();
154 indexList
= wrapInitialResult(res
).getIndexInfo(monitoredIndexBuffer
);
155 if (!monitoredIndexBuffer
.isEmpty()) {
156 monitoredIndexUsageTracker
.addNewUsage(monitoredIndexBuffer
, query
);
163 public Cursor
loadMoreEntities(List
<Entity
> buffer
, List
<Cursor
> cursorBuffer
) {
164 return loadMoreEntities(AT_LEAST_ONE
, buffer
, cursorBuffer
);
168 public Cursor
loadMoreEntities(int numberToLoad
, List
<Entity
> buffer
, List
<Cursor
> cursorBuffer
) {
169 TransactionImpl
.ensureTxnActive(txn
);
170 if (nextQueryPrototype
== null || queryResultFuture
!= null) {
171 if (numberToLoad
== 0 &&
172 offset
<= skippedResults
) {
173 if (!addedSkippedCursor
) {
174 cursorBuffer
.add(null);
175 addedSkippedCursor
= true;
179 WrappedQueryResult res
;
180 if (nextQueryPrototype
== null) {
182 InitialResultT initialRes
= FutureHelper
.quietGet(initialQueryResultFuture
);
183 nextQueryPrototype
= buildNextCallPrototype(initialRes
);
184 res
= wrapInitialResult(initialRes
);
186 res
= wrapResult(FutureHelper
.quietGet(queryResultFuture
));
187 queryResultFuture
= null;
190 int fetchedSoFar
= processQueryResult(res
, buffer
, cursorBuffer
);
192 Integer fetchCountOrNull
= null;
193 Integer offsetOrNull
= null;
194 if (res
.hasMoreResults()) {
195 boolean setCount
= true;
196 if (numberToLoad
<= 0) {
198 if (chunkSize
!= AT_LEAST_ONE
) {
199 fetchCountOrNull
= chunkSize
;
201 if (numberToLoad
== AT_LEAST_ONE
) {
206 while (res
.hasMoreResults()
207 && (skippedResults
< offset
208 || fetchedSoFar
< numberToLoad
)) {
209 if (skippedResults
< offset
) {
210 offsetOrNull
= offset
- skippedResults
;
215 fetchCountOrNull
= Math
.max(chunkSize
, numberToLoad
- fetchedSoFar
);
217 res
= wrapResult(FutureHelper
.quietGet(
218 makeNextCall(nextQueryPrototype
, fetchCountOrNull
, offsetOrNull
)));
219 fetchedSoFar
+= processQueryResult(res
, buffer
, cursorBuffer
);
223 if (res
.hasMoreResults()) {
224 fetchCountOrNull
= chunkSize
!= AT_LEAST_ONE ? chunkSize
: null;
226 queryResultFuture
= makeNextCall(nextQueryPrototype
, fetchCountOrNull
, offsetOrNull
);
228 return res
.getEndCursor();
234 * Helper function to process the query results.
236 * This function adds results to the given buffer and updates {@link
239 * @param res The {@link com.google.apphosting.datastore.DatastoreV3Pb.QueryResult} to process
240 * @param buffer the buffer to which to add results
241 * @returns The number of new results added to buffer.
243 private int processQueryResult(WrappedQueryResult res
, List
<Entity
> buffer
,
244 List
<Cursor
> cursorBuffer
) {
245 skippedResults
+= res
.numSkippedResults();
246 if (skippedResults
>= offset
&& !addedSkippedCursor
) {
247 cursorBuffer
.add(res
.getSkippedResultsCursor());
248 addedSkippedCursor
= true;
251 List
<Entity
> entityList
= res
.getEntities(query
.getProjections());
252 buffer
.addAll(entityList
);
253 cursorBuffer
.addAll(res
.getResultCursors());
254 for (Entity entity
: entityList
) {
255 callbacks
.executePostLoadCallbacks(new PostLoadContext(currentTransactionProvider
, entity
));
257 totalResults
+= entityList
.size();
258 if (chunkSize
== AT_LEAST_ONE
&& totalResults
> CHUNK_SIZE_WARNING_RESULT_SET_SIZE_THRESHOLD
259 && System
.getProperty(DISABLE_CHUNK_SIZE_WARNING_SYS_PROP
) == null) {
260 logChunkSizeWarning();
262 return entityList
.size();
265 void logChunkSizeWarning() {
266 long now
= System
.currentTimeMillis();
267 if ((now
- lastChunkSizeWarning
.get()) < MAX_CHUNK_SIZE_WARNING_FREQUENCY_MS
) {
271 "This query does not have a chunk size set in FetchOptions and has returned over "
272 + CHUNK_SIZE_WARNING_RESULT_SET_SIZE_THRESHOLD
+ " results. If result sets of this "
273 + "size are common for this query, consider setting a chunk size to improve "
274 + "performance.\n To disable this warning set the following system property in "
275 + "appengine-web.xml (the value of the property doesn't matter): '"
276 + DISABLE_CHUNK_SIZE_WARNING_SYS_PROP
+ "'");
277 lastChunkSizeWarning
.set(now
);